]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "AbstractWriteLog.h" | |
5 | #include "include/buffer.h" | |
6 | #include "include/Context.h" | |
7 | #include "include/ceph_assert.h" | |
8 | #include "common/deleter.h" | |
9 | #include "common/dout.h" | |
10 | #include "common/environment.h" | |
11 | #include "common/errno.h" | |
12 | #include "common/hostname.h" | |
13 | #include "common/WorkQueue.h" | |
14 | #include "common/Timer.h" | |
15 | #include "common/perf_counters.h" | |
16 | #include "librbd/ImageCtx.h" | |
17 | #include "librbd/asio/ContextWQ.h" | |
18 | #include "librbd/cache/pwl/ImageCacheState.h" | |
19 | #include "librbd/cache/pwl/LogEntry.h" | |
20 | #include "librbd/plugin/Api.h" | |
21 | #include <map> | |
22 | #include <vector> | |
23 | ||
24 | #undef dout_subsys | |
25 | #define dout_subsys ceph_subsys_rbd_pwl | |
26 | #undef dout_prefix | |
27 | #define dout_prefix *_dout << "librbd::cache::pwl::AbstractWriteLog: " << this \ | |
28 | << " " << __func__ << ": " | |
29 | ||
30 | namespace librbd { | |
31 | namespace cache { | |
32 | namespace pwl { | |
33 | ||
34 | using namespace librbd::cache::pwl; | |
35 | ||
36 | typedef AbstractWriteLog<ImageCtx>::Extent Extent; | |
37 | typedef AbstractWriteLog<ImageCtx>::Extents Extents; | |
38 | ||
39 | template <typename I> | |
40 | AbstractWriteLog<I>::AbstractWriteLog( | |
41 | I &image_ctx, librbd::cache::pwl::ImageCacheState<I>* cache_state, | |
42 | Builder<This> *builder, cache::ImageWritebackInterface& image_writeback, | |
43 | plugin::Api<I>& plugin_api) | |
44 | : m_builder(builder), | |
45 | m_write_log_guard(image_ctx.cct), | |
46 | m_deferred_dispatch_lock(ceph::make_mutex(pwl::unique_lock_name( | |
47 | "librbd::cache::pwl::AbstractWriteLog::m_deferred_dispatch_lock", this))), | |
48 | m_blockguard_lock(ceph::make_mutex(pwl::unique_lock_name( | |
49 | "librbd::cache::pwl::AbstractWriteLog::m_blockguard_lock", this))), | |
50 | m_thread_pool( | |
51 | image_ctx.cct, "librbd::cache::pwl::AbstractWriteLog::thread_pool", | |
52 | "tp_pwl", 4, ""), | |
53 | m_cache_state(cache_state), | |
54 | m_image_ctx(image_ctx), | |
a4b75251 | 55 | m_log_pool_size(DEFAULT_POOL_SIZE), |
f67539c2 TL |
56 | m_image_writeback(image_writeback), |
57 | m_plugin_api(plugin_api), | |
58 | m_log_retire_lock(ceph::make_mutex(pwl::unique_lock_name( | |
59 | "librbd::cache::pwl::AbstractWriteLog::m_log_retire_lock", this))), | |
60 | m_entry_reader_lock("librbd::cache::pwl::AbstractWriteLog::m_entry_reader_lock"), | |
61 | m_log_append_lock(ceph::make_mutex(pwl::unique_lock_name( | |
62 | "librbd::cache::pwl::AbstractWriteLog::m_log_append_lock", this))), | |
63 | m_lock(ceph::make_mutex(pwl::unique_lock_name( | |
64 | "librbd::cache::pwl::AbstractWriteLog::m_lock", this))), | |
65 | m_blocks_to_log_entries(image_ctx.cct), | |
66 | m_work_queue("librbd::cache::pwl::ReplicatedWriteLog::work_queue", | |
67 | ceph::make_timespan( | |
68 | image_ctx.config.template get_val<uint64_t>( | |
69 | "rbd_op_thread_timeout")), | |
70 | &m_thread_pool) | |
71 | { | |
72 | CephContext *cct = m_image_ctx.cct; | |
73 | m_plugin_api.get_image_timer_instance(cct, &m_timer, &m_timer_lock); | |
74 | } | |
75 | ||
76 | template <typename I> | |
77 | AbstractWriteLog<I>::~AbstractWriteLog() { | |
78 | ldout(m_image_ctx.cct, 15) << "enter" << dendl; | |
79 | { | |
80 | std::lock_guard timer_locker(*m_timer_lock); | |
81 | std::lock_guard locker(m_lock); | |
82 | m_timer->cancel_event(m_timer_ctx); | |
83 | m_thread_pool.stop(); | |
84 | ceph_assert(m_deferred_ios.size() == 0); | |
85 | ceph_assert(m_ops_to_flush.size() == 0); | |
86 | ceph_assert(m_ops_to_append.size() == 0); | |
87 | ceph_assert(m_flush_ops_in_flight == 0); | |
88 | ||
89 | delete m_cache_state; | |
90 | m_cache_state = nullptr; | |
91 | } | |
92 | ldout(m_image_ctx.cct, 15) << "exit" << dendl; | |
93 | } | |
94 | ||
95 | template <typename I> | |
96 | void AbstractWriteLog<I>::perf_start(std::string name) { | |
97 | PerfCountersBuilder plb(m_image_ctx.cct, name, l_librbd_pwl_first, | |
98 | l_librbd_pwl_last); | |
99 | ||
100 | // Latency axis configuration for op histograms, values are in nanoseconds | |
101 | PerfHistogramCommon::axis_config_d op_hist_x_axis_config{ | |
102 | "Latency (nsec)", | |
103 | PerfHistogramCommon::SCALE_LOG2, ///< Latency in logarithmic scale | |
104 | 0, ///< Start at 0 | |
105 | 5000, ///< Quantization unit is 5usec | |
106 | 16, ///< Ranges into the mS | |
107 | }; | |
108 | ||
109 | // Syncpoint logentry number x-axis configuration for op histograms | |
110 | PerfHistogramCommon::axis_config_d sp_logentry_number_config{ | |
111 | "logentry number", | |
112 | PerfHistogramCommon::SCALE_LINEAR, // log entry number in linear scale | |
113 | 0, // Start at 0 | |
114 | 1, // Quantization unit is 1 | |
115 | 260, // Up to 260 > (MAX_WRITES_PER_SYNC_POINT) | |
116 | }; | |
117 | ||
118 | // Syncpoint bytes number y-axis configuration for op histogram | |
119 | PerfHistogramCommon::axis_config_d sp_bytes_number_config{ | |
120 | "Number of SyncPoint", | |
121 | PerfHistogramCommon::SCALE_LOG2, // Request size in logarithmic scale | |
122 | 0, // Start at 0 | |
123 | 512, // Quantization unit is 512 | |
124 | 17, // Writes up to 8M >= MAX_BYTES_PER_SYNC_POINT | |
125 | }; | |
126 | ||
127 | // Op size axis configuration for op histogram y axis, values are in bytes | |
128 | PerfHistogramCommon::axis_config_d op_hist_y_axis_config{ | |
129 | "Request size (bytes)", | |
130 | PerfHistogramCommon::SCALE_LOG2, ///< Request size in logarithmic scale | |
131 | 0, ///< Start at 0 | |
132 | 512, ///< Quantization unit is 512 bytes | |
133 | 16, ///< Writes up to >32k | |
134 | }; | |
135 | ||
136 | // Num items configuration for op histogram y axis, values are in items | |
137 | PerfHistogramCommon::axis_config_d op_hist_y_axis_count_config{ | |
138 | "Number of items", | |
139 | PerfHistogramCommon::SCALE_LINEAR, ///< Request size in linear scale | |
140 | 0, ///< Start at 0 | |
141 | 1, ///< Quantization unit is 1 | |
142 | 32, ///< Writes up to >32k | |
143 | }; | |
144 | ||
145 | plb.add_u64_counter(l_librbd_pwl_rd_req, "rd", "Reads"); | |
146 | plb.add_u64_counter(l_librbd_pwl_rd_bytes, "rd_bytes", "Data size in reads"); | |
147 | plb.add_time_avg(l_librbd_pwl_rd_latency, "rd_latency", "Latency of reads"); | |
148 | ||
149 | plb.add_u64_counter(l_librbd_pwl_rd_hit_req, "hit_rd", "Reads completely hitting RWL"); | |
150 | plb.add_u64_counter(l_librbd_pwl_rd_hit_bytes, "rd_hit_bytes", "Bytes read from RWL"); | |
151 | plb.add_time_avg(l_librbd_pwl_rd_hit_latency, "hit_rd_latency", "Latency of read hits"); | |
152 | ||
153 | plb.add_u64_counter(l_librbd_pwl_rd_part_hit_req, "part_hit_rd", "reads partially hitting RWL"); | |
154 | ||
155 | plb.add_u64_counter_histogram( | |
156 | l_librbd_pwl_syncpoint_hist, "syncpoint_logentry_bytes_histogram", | |
157 | sp_logentry_number_config, sp_bytes_number_config, | |
158 | "Histogram of syncpoint's logentry numbers vs bytes number"); | |
159 | ||
160 | plb.add_u64_counter(l_librbd_pwl_wr_req, "wr", "Writes"); | |
a4b75251 | 161 | plb.add_u64_counter(l_librbd_pwl_wr_bytes, "wr_bytes", "Data size in writes"); |
f67539c2 TL |
162 | plb.add_u64_counter(l_librbd_pwl_wr_req_def, "wr_def", "Writes deferred for resources"); |
163 | plb.add_u64_counter(l_librbd_pwl_wr_req_def_lanes, "wr_def_lanes", "Writes deferred for lanes"); | |
164 | plb.add_u64_counter(l_librbd_pwl_wr_req_def_log, "wr_def_log", "Writes deferred for log entries"); | |
165 | plb.add_u64_counter(l_librbd_pwl_wr_req_def_buf, "wr_def_buf", "Writes deferred for buffers"); | |
166 | plb.add_u64_counter(l_librbd_pwl_wr_req_overlap, "wr_overlap", "Writes overlapping with prior in-progress writes"); | |
167 | plb.add_u64_counter(l_librbd_pwl_wr_req_queued, "wr_q_barrier", "Writes queued for prior barriers (aio_flush)"); | |
f67539c2 TL |
168 | |
169 | plb.add_u64_counter(l_librbd_pwl_log_ops, "log_ops", "Log appends"); | |
170 | plb.add_u64_avg(l_librbd_pwl_log_op_bytes, "log_op_bytes", "Average log append bytes"); | |
171 | ||
172 | plb.add_time_avg( | |
173 | l_librbd_pwl_req_arr_to_all_t, "req_arr_to_all_t", | |
174 | "Average arrival to allocation time (time deferred for overlap)"); | |
175 | plb.add_time_avg( | |
176 | l_librbd_pwl_req_arr_to_dis_t, "req_arr_to_dis_t", | |
177 | "Average arrival to dispatch time (includes time deferred for overlaps and allocation)"); | |
178 | plb.add_time_avg( | |
179 | l_librbd_pwl_req_all_to_dis_t, "req_all_to_dis_t", | |
180 | "Average allocation to dispatch time (time deferred for log resources)"); | |
181 | plb.add_time_avg( | |
182 | l_librbd_pwl_wr_latency, "wr_latency", | |
183 | "Latency of writes (persistent completion)"); | |
184 | plb.add_u64_counter_histogram( | |
185 | l_librbd_pwl_wr_latency_hist, "wr_latency_bytes_histogram", | |
186 | op_hist_x_axis_config, op_hist_y_axis_config, | |
187 | "Histogram of write request latency (nanoseconds) vs. bytes written"); | |
188 | plb.add_time_avg( | |
189 | l_librbd_pwl_wr_caller_latency, "caller_wr_latency", | |
190 | "Latency of write completion to caller"); | |
191 | plb.add_time_avg( | |
192 | l_librbd_pwl_nowait_req_arr_to_all_t, "req_arr_to_all_nw_t", | |
193 | "Average arrival to allocation time (time deferred for overlap)"); | |
194 | plb.add_time_avg( | |
195 | l_librbd_pwl_nowait_req_arr_to_dis_t, "req_arr_to_dis_nw_t", | |
196 | "Average arrival to dispatch time (includes time deferred for overlaps and allocation)"); | |
197 | plb.add_time_avg( | |
198 | l_librbd_pwl_nowait_req_all_to_dis_t, "req_all_to_dis_nw_t", | |
199 | "Average allocation to dispatch time (time deferred for log resources)"); | |
200 | plb.add_time_avg( | |
201 | l_librbd_pwl_nowait_wr_latency, "wr_latency_nw", | |
202 | "Latency of writes (persistent completion) not deferred for free space"); | |
203 | plb.add_u64_counter_histogram( | |
204 | l_librbd_pwl_nowait_wr_latency_hist, "wr_latency_nw_bytes_histogram", | |
205 | op_hist_x_axis_config, op_hist_y_axis_config, | |
206 | "Histogram of write request latency (nanoseconds) vs. bytes written for writes not deferred for free space"); | |
207 | plb.add_time_avg( | |
208 | l_librbd_pwl_nowait_wr_caller_latency, "caller_wr_latency_nw", | |
209 | "Latency of write completion to callerfor writes not deferred for free space"); | |
210 | plb.add_time_avg(l_librbd_pwl_log_op_alloc_t, "op_alloc_t", "Average buffer pmemobj_reserve() time"); | |
211 | plb.add_u64_counter_histogram( | |
212 | l_librbd_pwl_log_op_alloc_t_hist, "op_alloc_t_bytes_histogram", | |
213 | op_hist_x_axis_config, op_hist_y_axis_config, | |
214 | "Histogram of buffer pmemobj_reserve() time (nanoseconds) vs. bytes written"); | |
215 | plb.add_time_avg(l_librbd_pwl_log_op_dis_to_buf_t, "op_dis_to_buf_t", "Average dispatch to buffer persist time"); | |
216 | plb.add_time_avg(l_librbd_pwl_log_op_dis_to_app_t, "op_dis_to_app_t", "Average dispatch to log append time"); | |
217 | plb.add_time_avg(l_librbd_pwl_log_op_dis_to_cmp_t, "op_dis_to_cmp_t", "Average dispatch to persist completion time"); | |
218 | plb.add_u64_counter_histogram( | |
219 | l_librbd_pwl_log_op_dis_to_cmp_t_hist, "op_dis_to_cmp_t_bytes_histogram", | |
220 | op_hist_x_axis_config, op_hist_y_axis_config, | |
221 | "Histogram of op dispatch to persist complete time (nanoseconds) vs. bytes written"); | |
222 | ||
223 | plb.add_time_avg( | |
224 | l_librbd_pwl_log_op_buf_to_app_t, "op_buf_to_app_t", | |
225 | "Average buffer persist to log append time (write data persist/replicate + wait for append time)"); | |
226 | plb.add_time_avg( | |
227 | l_librbd_pwl_log_op_buf_to_bufc_t, "op_buf_to_bufc_t", | |
228 | "Average buffer persist time (write data persist/replicate time)"); | |
229 | plb.add_u64_counter_histogram( | |
230 | l_librbd_pwl_log_op_buf_to_bufc_t_hist, "op_buf_to_bufc_t_bytes_histogram", | |
231 | op_hist_x_axis_config, op_hist_y_axis_config, | |
232 | "Histogram of write buffer persist time (nanoseconds) vs. bytes written"); | |
233 | plb.add_time_avg( | |
234 | l_librbd_pwl_log_op_app_to_cmp_t, "op_app_to_cmp_t", | |
235 | "Average log append to persist complete time (log entry append/replicate + wait for complete time)"); | |
236 | plb.add_time_avg( | |
237 | l_librbd_pwl_log_op_app_to_appc_t, "op_app_to_appc_t", | |
238 | "Average log append to persist complete time (log entry append/replicate time)"); | |
239 | plb.add_u64_counter_histogram( | |
240 | l_librbd_pwl_log_op_app_to_appc_t_hist, "op_app_to_appc_t_bytes_histogram", | |
241 | op_hist_x_axis_config, op_hist_y_axis_config, | |
242 | "Histogram of log append persist time (nanoseconds) (vs. op bytes)"); | |
243 | ||
244 | plb.add_u64_counter(l_librbd_pwl_discard, "discard", "Discards"); | |
245 | plb.add_u64_counter(l_librbd_pwl_discard_bytes, "discard_bytes", "Bytes discarded"); | |
246 | plb.add_time_avg(l_librbd_pwl_discard_latency, "discard_lat", "Discard latency"); | |
247 | ||
248 | plb.add_u64_counter(l_librbd_pwl_aio_flush, "aio_flush", "AIO flush (flush to RWL)"); | |
249 | plb.add_u64_counter(l_librbd_pwl_aio_flush_def, "aio_flush_def", "AIO flushes deferred for resources"); | |
250 | plb.add_time_avg(l_librbd_pwl_aio_flush_latency, "aio_flush_lat", "AIO flush latency"); | |
251 | ||
252 | plb.add_u64_counter(l_librbd_pwl_ws,"ws", "Write Sames"); | |
253 | plb.add_u64_counter(l_librbd_pwl_ws_bytes, "ws_bytes", "Write Same bytes to image"); | |
254 | plb.add_time_avg(l_librbd_pwl_ws_latency, "ws_lat", "Write Same latency"); | |
255 | ||
256 | plb.add_u64_counter(l_librbd_pwl_cmp, "cmp", "Compare and Write requests"); | |
257 | plb.add_u64_counter(l_librbd_pwl_cmp_bytes, "cmp_bytes", "Compare and Write bytes compared/written"); | |
258 | plb.add_time_avg(l_librbd_pwl_cmp_latency, "cmp_lat", "Compare and Write latecy"); | |
259 | plb.add_u64_counter(l_librbd_pwl_cmp_fails, "cmp_fails", "Compare and Write compare fails"); | |
260 | ||
a4b75251 TL |
261 | plb.add_u64_counter(l_librbd_pwl_internal_flush, "internal_flush", "Flush RWL (write back to OSD)"); |
262 | plb.add_time_avg(l_librbd_pwl_writeback_latency, "writeback_lat", "write back to OSD latency"); | |
f67539c2 TL |
263 | plb.add_u64_counter(l_librbd_pwl_invalidate_cache, "invalidate", "Invalidate RWL"); |
264 | plb.add_u64_counter(l_librbd_pwl_invalidate_discard_cache, "discard", "Discard and invalidate RWL"); | |
265 | ||
266 | plb.add_time_avg(l_librbd_pwl_append_tx_t, "append_tx_lat", "Log append transaction latency"); | |
267 | plb.add_u64_counter_histogram( | |
268 | l_librbd_pwl_append_tx_t_hist, "append_tx_lat_histogram", | |
269 | op_hist_x_axis_config, op_hist_y_axis_count_config, | |
270 | "Histogram of log append transaction time (nanoseconds) vs. entries appended"); | |
271 | plb.add_time_avg(l_librbd_pwl_retire_tx_t, "retire_tx_lat", "Log retire transaction latency"); | |
272 | plb.add_u64_counter_histogram( | |
273 | l_librbd_pwl_retire_tx_t_hist, "retire_tx_lat_histogram", | |
274 | op_hist_x_axis_config, op_hist_y_axis_count_config, | |
275 | "Histogram of log retire transaction time (nanoseconds) vs. entries retired"); | |
276 | ||
277 | m_perfcounter = plb.create_perf_counters(); | |
278 | m_image_ctx.cct->get_perfcounters_collection()->add(m_perfcounter); | |
279 | } | |
280 | ||
281 | template <typename I> | |
282 | void AbstractWriteLog<I>::perf_stop() { | |
283 | ceph_assert(m_perfcounter); | |
284 | m_image_ctx.cct->get_perfcounters_collection()->remove(m_perfcounter); | |
285 | delete m_perfcounter; | |
286 | } | |
287 | ||
288 | template <typename I> | |
289 | void AbstractWriteLog<I>::log_perf() { | |
290 | bufferlist bl; | |
291 | Formatter *f = Formatter::create("json-pretty"); | |
292 | bl.append("Perf dump follows\n--- Begin perf dump ---\n"); | |
293 | bl.append("{\n"); | |
294 | stringstream ss; | |
295 | utime_t now = ceph_clock_now(); | |
296 | ss << "\"test_time\": \"" << now << "\","; | |
297 | ss << "\"image\": \"" << m_image_ctx.name << "\","; | |
298 | bl.append(ss); | |
299 | bl.append("\"stats\": "); | |
300 | m_image_ctx.cct->get_perfcounters_collection()->dump_formatted(f, 0); | |
301 | f->flush(bl); | |
302 | bl.append(",\n\"histograms\": "); | |
303 | m_image_ctx.cct->get_perfcounters_collection()->dump_formatted_histograms(f, 0); | |
304 | f->flush(bl); | |
305 | delete f; | |
306 | bl.append("}\n--- End perf dump ---\n"); | |
307 | bl.append('\0'); | |
308 | ldout(m_image_ctx.cct, 1) << bl.c_str() << dendl; | |
309 | } | |
310 | ||
311 | template <typename I> | |
312 | void AbstractWriteLog<I>::periodic_stats() { | |
313 | std::lock_guard locker(m_lock); | |
a4b75251 TL |
314 | ldout(m_image_ctx.cct, 1) << "STATS: m_log_entries=" << m_log_entries.size() |
315 | << ", m_dirty_log_entries=" << m_dirty_log_entries.size() | |
316 | << ", m_free_log_entries=" << m_free_log_entries | |
317 | << ", m_bytes_allocated=" << m_bytes_allocated | |
318 | << ", m_bytes_cached=" << m_bytes_cached | |
319 | << ", m_bytes_dirty=" << m_bytes_dirty | |
320 | << ", bytes available=" << m_bytes_allocated_cap - m_bytes_allocated | |
321 | << ", m_first_valid_entry=" << m_first_valid_entry | |
322 | << ", m_first_free_entry=" << m_first_free_entry | |
323 | << ", m_current_sync_gen=" << m_current_sync_gen | |
324 | << ", m_flushed_sync_gen=" << m_flushed_sync_gen | |
f67539c2 TL |
325 | << dendl; |
326 | } | |
327 | ||
328 | template <typename I> | |
329 | void AbstractWriteLog<I>::arm_periodic_stats() { | |
330 | ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); | |
331 | if (m_periodic_stats_enabled) { | |
332 | m_timer_ctx = new LambdaContext( | |
333 | [this](int r) { | |
334 | /* m_timer_lock is held */ | |
335 | periodic_stats(); | |
336 | arm_periodic_stats(); | |
337 | }); | |
338 | m_timer->add_event_after(LOG_STATS_INTERVAL_SECONDS, m_timer_ctx); | |
339 | } | |
340 | } | |
341 | ||
342 | template <typename I> | |
343 | void AbstractWriteLog<I>::update_entries(std::shared_ptr<GenericLogEntry> *log_entry, | |
344 | WriteLogCacheEntry *cache_entry, std::map<uint64_t, bool> &missing_sync_points, | |
345 | std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries, | |
a4b75251 | 346 | uint64_t entry_index) { |
f67539c2 TL |
347 | bool writer = cache_entry->is_writer(); |
348 | if (cache_entry->is_sync_point()) { | |
349 | ldout(m_image_ctx.cct, 20) << "Entry " << entry_index | |
350 | << " is a sync point. cache_entry=[" << *cache_entry << "]" << dendl; | |
351 | auto sync_point_entry = std::make_shared<SyncPointLogEntry>(cache_entry->sync_gen_number); | |
352 | *log_entry = sync_point_entry; | |
353 | sync_point_entries[cache_entry->sync_gen_number] = sync_point_entry; | |
354 | missing_sync_points.erase(cache_entry->sync_gen_number); | |
355 | m_current_sync_gen = cache_entry->sync_gen_number; | |
356 | } else if (cache_entry->is_write()) { | |
357 | ldout(m_image_ctx.cct, 20) << "Entry " << entry_index | |
358 | << " is a write. cache_entry=[" << *cache_entry << "]" << dendl; | |
359 | auto write_entry = | |
360 | m_builder->create_write_log_entry(nullptr, cache_entry->image_offset_bytes, cache_entry->write_bytes); | |
361 | write_data_to_buffer(write_entry, cache_entry); | |
362 | *log_entry = write_entry; | |
363 | } else if (cache_entry->is_writesame()) { | |
364 | ldout(m_image_ctx.cct, 20) << "Entry " << entry_index | |
365 | << " is a write same. cache_entry=[" << *cache_entry << "]" << dendl; | |
366 | auto ws_entry = | |
367 | m_builder->create_writesame_log_entry(nullptr, cache_entry->image_offset_bytes, | |
368 | cache_entry->write_bytes, cache_entry->ws_datalen); | |
369 | write_data_to_buffer(ws_entry, cache_entry); | |
370 | *log_entry = ws_entry; | |
371 | } else if (cache_entry->is_discard()) { | |
372 | ldout(m_image_ctx.cct, 20) << "Entry " << entry_index | |
373 | << " is a discard. cache_entry=[" << *cache_entry << "]" << dendl; | |
374 | auto discard_entry = | |
375 | std::make_shared<DiscardLogEntry>(nullptr, cache_entry->image_offset_bytes, cache_entry->write_bytes, | |
376 | m_discard_granularity_bytes); | |
377 | *log_entry = discard_entry; | |
378 | } else { | |
379 | lderr(m_image_ctx.cct) << "Unexpected entry type in entry " << entry_index | |
380 | << ", cache_entry=[" << *cache_entry << "]" << dendl; | |
381 | } | |
382 | ||
383 | if (writer) { | |
384 | ldout(m_image_ctx.cct, 20) << "Entry " << entry_index | |
385 | << " writes. cache_entry=[" << *cache_entry << "]" << dendl; | |
386 | if (!sync_point_entries[cache_entry->sync_gen_number]) { | |
387 | missing_sync_points[cache_entry->sync_gen_number] = true; | |
388 | } | |
389 | } | |
390 | } | |
391 | ||
392 | template <typename I> | |
393 | void AbstractWriteLog<I>::update_sync_points(std::map<uint64_t, bool> &missing_sync_points, | |
394 | std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries, | |
a4b75251 | 395 | DeferredContexts &later) { |
f67539c2 TL |
396 | /* Create missing sync points. These must not be appended until the |
397 | * entry reload is complete and the write map is up to | |
398 | * date. Currently this is handled by the deferred contexts object | |
399 | * passed to new_sync_point(). These contexts won't be completed | |
400 | * until this function returns. */ | |
401 | for (auto &kv : missing_sync_points) { | |
402 | ldout(m_image_ctx.cct, 5) << "Adding sync point " << kv.first << dendl; | |
403 | if (0 == m_current_sync_gen) { | |
404 | /* The unlikely case where the log contains writing entries, but no sync | |
405 | * points (e.g. because they were all retired) */ | |
406 | m_current_sync_gen = kv.first-1; | |
407 | } | |
408 | ceph_assert(kv.first == m_current_sync_gen+1); | |
409 | init_flush_new_sync_point(later); | |
410 | ceph_assert(kv.first == m_current_sync_gen); | |
411 | sync_point_entries[kv.first] = m_current_sync_point->log_entry;; | |
412 | } | |
413 | ||
414 | /* | |
415 | * Iterate over the log entries again (this time via the global | |
416 | * entries list), connecting write entries to their sync points and | |
417 | * updating the sync point stats. | |
418 | * | |
419 | * Add writes to the write log map. | |
420 | */ | |
421 | std::shared_ptr<SyncPointLogEntry> previous_sync_point_entry = nullptr; | |
422 | for (auto &log_entry : m_log_entries) { | |
423 | if ((log_entry->write_bytes() > 0) || (log_entry->bytes_dirty() > 0)) { | |
424 | /* This entry is one of the types that write */ | |
425 | auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(log_entry); | |
426 | if (gen_write_entry) { | |
427 | auto sync_point_entry = sync_point_entries[gen_write_entry->ram_entry.sync_gen_number]; | |
428 | if (!sync_point_entry) { | |
429 | lderr(m_image_ctx.cct) << "Sync point missing for entry=[" << *gen_write_entry << "]" << dendl; | |
430 | ceph_assert(false); | |
431 | } else { | |
432 | gen_write_entry->sync_point_entry = sync_point_entry; | |
433 | sync_point_entry->writes++; | |
434 | sync_point_entry->bytes += gen_write_entry->ram_entry.write_bytes; | |
435 | sync_point_entry->writes_completed++; | |
436 | m_blocks_to_log_entries.add_log_entry(gen_write_entry); | |
437 | /* This entry is only dirty if its sync gen number is > the flushed | |
438 | * sync gen number from the root object. */ | |
439 | if (gen_write_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) { | |
440 | m_dirty_log_entries.push_back(log_entry); | |
441 | m_bytes_dirty += gen_write_entry->bytes_dirty(); | |
442 | } else { | |
443 | gen_write_entry->set_flushed(true); | |
444 | sync_point_entry->writes_flushed++; | |
445 | } | |
a4b75251 TL |
446 | |
447 | /* calc m_bytes_allocated & m_bytes_cached */ | |
448 | inc_allocated_cached_bytes(log_entry); | |
f67539c2 TL |
449 | } |
450 | } | |
451 | } else { | |
452 | /* This entry is sync point entry */ | |
453 | auto sync_point_entry = static_pointer_cast<SyncPointLogEntry>(log_entry); | |
454 | if (sync_point_entry) { | |
455 | if (previous_sync_point_entry) { | |
456 | previous_sync_point_entry->next_sync_point_entry = sync_point_entry; | |
457 | if (previous_sync_point_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) { | |
458 | sync_point_entry->prior_sync_point_flushed = false; | |
459 | ceph_assert(!previous_sync_point_entry->prior_sync_point_flushed || | |
460 | (0 == previous_sync_point_entry->writes) || | |
461 | (previous_sync_point_entry->writes >= previous_sync_point_entry->writes_flushed)); | |
462 | } else { | |
463 | sync_point_entry->prior_sync_point_flushed = true; | |
464 | ceph_assert(previous_sync_point_entry->prior_sync_point_flushed); | |
465 | ceph_assert(previous_sync_point_entry->writes == previous_sync_point_entry->writes_flushed); | |
466 | } | |
467 | } else { | |
468 | /* There are no previous sync points, so we'll consider them flushed */ | |
469 | sync_point_entry->prior_sync_point_flushed = true; | |
470 | } | |
471 | previous_sync_point_entry = sync_point_entry; | |
472 | ldout(m_image_ctx.cct, 10) << "Loaded to sync point=[" << *sync_point_entry << dendl; | |
473 | } | |
474 | } | |
475 | } | |
476 | if (0 == m_current_sync_gen) { | |
477 | /* If a re-opened log was completely flushed, we'll have found no sync point entries here, | |
478 | * and not advanced m_current_sync_gen. Here we ensure it starts past the last flushed sync | |
479 | * point recorded in the log. */ | |
480 | m_current_sync_gen = m_flushed_sync_gen; | |
481 | } | |
482 | } | |
483 | ||
484 | template <typename I> | |
485 | void AbstractWriteLog<I>::pwl_init(Context *on_finish, DeferredContexts &later) { | |
486 | CephContext *cct = m_image_ctx.cct; | |
487 | ldout(cct, 20) << dendl; | |
488 | ceph_assert(m_cache_state); | |
489 | std::lock_guard locker(m_lock); | |
490 | ceph_assert(!m_initialized); | |
491 | ldout(cct,5) << "image name: " << m_image_ctx.name << " id: " << m_image_ctx.id << dendl; | |
492 | ||
493 | if (!m_cache_state->present) { | |
494 | m_cache_state->host = ceph_get_short_hostname(); | |
495 | m_cache_state->size = m_image_ctx.config.template get_val<uint64_t>( | |
496 | "rbd_persistent_cache_size"); | |
497 | ||
498 | string path = m_image_ctx.config.template get_val<string>( | |
499 | "rbd_persistent_cache_path"); | |
500 | std::string pool_name = m_image_ctx.md_ctx.get_pool_name(); | |
501 | m_cache_state->path = path + "/rbd-pwl." + pool_name + "." + m_image_ctx.id + ".pool"; | |
502 | } | |
503 | ||
504 | ldout(cct,5) << "pwl_size: " << m_cache_state->size << dendl; | |
505 | ldout(cct,5) << "pwl_path: " << m_cache_state->path << dendl; | |
506 | ||
507 | m_log_pool_name = m_cache_state->path; | |
a4b75251 TL |
508 | m_log_pool_size = max(m_cache_state->size, MIN_POOL_SIZE); |
509 | m_log_pool_size = p2align(m_log_pool_size, POOL_SIZE_ALIGN); | |
510 | ldout(cct, 5) << "pool " << m_log_pool_name << " size " << m_log_pool_size | |
511 | << " (adjusted from " << m_cache_state->size << ")" << dendl; | |
f67539c2 TL |
512 | |
513 | if ((!m_cache_state->present) && | |
514 | (access(m_log_pool_name.c_str(), F_OK) == 0)) { | |
515 | ldout(cct, 5) << "There's an existing pool file " << m_log_pool_name | |
516 | << ", While there's no cache in the image metatata." << dendl; | |
517 | if (remove(m_log_pool_name.c_str()) != 0) { | |
518 | lderr(cct) << "Failed to remove the pool file " << m_log_pool_name | |
519 | << dendl; | |
520 | on_finish->complete(-errno); | |
521 | return; | |
522 | } else { | |
523 | ldout(cct, 5) << "Removed the existing pool file." << dendl; | |
524 | } | |
525 | } else if ((m_cache_state->present) && | |
526 | (access(m_log_pool_name.c_str(), F_OK) != 0)) { | |
527 | ldout(cct, 5) << "Can't find the existed pool file " << m_log_pool_name << dendl; | |
528 | on_finish->complete(-errno); | |
a4b75251 | 529 | return; |
f67539c2 TL |
530 | } |
531 | ||
a4b75251 TL |
532 | bool succeeded = initialize_pool(on_finish, later); |
533 | if (!succeeded) { | |
534 | return ; | |
535 | } | |
f67539c2 TL |
536 | |
537 | ldout(cct,1) << "pool " << m_log_pool_name << " has " << m_total_log_entries | |
538 | << " log entries, " << m_free_log_entries << " of which are free." | |
539 | << " first_valid=" << m_first_valid_entry | |
540 | << ", first_free=" << m_first_free_entry | |
541 | << ", flushed_sync_gen=" << m_flushed_sync_gen | |
542 | << ", m_current_sync_gen=" << m_current_sync_gen << dendl; | |
543 | if (m_first_free_entry == m_first_valid_entry) { | |
544 | ldout(cct,1) << "write log is empty" << dendl; | |
545 | m_cache_state->empty = true; | |
546 | } | |
547 | ||
548 | /* Start the sync point following the last one seen in the | |
549 | * log. Flush the last sync point created during the loading of the | |
550 | * existing log entries. */ | |
551 | init_flush_new_sync_point(later); | |
552 | ldout(cct,20) << "new sync point = [" << m_current_sync_point << "]" << dendl; | |
553 | ||
554 | m_initialized = true; | |
555 | // Start the thread | |
556 | m_thread_pool.start(); | |
557 | ||
558 | m_periodic_stats_enabled = m_cache_state->log_periodic_stats; | |
559 | /* Do these after we drop lock */ | |
560 | later.add(new LambdaContext([this](int r) { | |
561 | if (m_periodic_stats_enabled) { | |
562 | /* Log stats for the first time */ | |
563 | periodic_stats(); | |
564 | /* Arm periodic stats logging for the first time */ | |
565 | std::lock_guard timer_locker(*m_timer_lock); | |
566 | arm_periodic_stats(); | |
567 | } | |
568 | })); | |
569 | m_image_ctx.op_work_queue->queue(on_finish, 0); | |
570 | } | |
571 | ||
572 | template <typename I> | |
573 | void AbstractWriteLog<I>::update_image_cache_state(Context *on_finish) { | |
574 | m_cache_state->write_image_cache_state(on_finish); | |
575 | } | |
576 | ||
577 | template <typename I> | |
578 | void AbstractWriteLog<I>::init(Context *on_finish) { | |
579 | CephContext *cct = m_image_ctx.cct; | |
580 | ldout(cct, 20) << dendl; | |
a4b75251 TL |
581 | auto pname = std::string("librbd-pwl-") + m_image_ctx.id + |
582 | std::string("-") + m_image_ctx.md_ctx.get_pool_name() + | |
583 | std::string("-") + m_image_ctx.name; | |
584 | perf_start(pname); | |
f67539c2 TL |
585 | |
586 | ceph_assert(!m_initialized); | |
587 | ||
588 | Context *ctx = new LambdaContext( | |
589 | [this, on_finish](int r) { | |
590 | if (r >= 0) { | |
591 | update_image_cache_state(on_finish); | |
592 | } else { | |
593 | on_finish->complete(r); | |
594 | } | |
595 | }); | |
596 | ||
597 | DeferredContexts later; | |
598 | pwl_init(ctx, later); | |
599 | } | |
600 | ||
601 | template <typename I> | |
602 | void AbstractWriteLog<I>::shut_down(Context *on_finish) { | |
603 | CephContext *cct = m_image_ctx.cct; | |
604 | ldout(cct, 20) << dendl; | |
605 | ||
606 | ldout(cct,5) << "image name: " << m_image_ctx.name << " id: " << m_image_ctx.id << dendl; | |
607 | ||
608 | Context *ctx = new LambdaContext( | |
609 | [this, on_finish](int r) { | |
610 | ldout(m_image_ctx.cct, 6) << "shutdown complete" << dendl; | |
611 | m_image_ctx.op_work_queue->queue(on_finish, r); | |
612 | }); | |
613 | ctx = new LambdaContext( | |
614 | [this, ctx](int r) { | |
615 | ldout(m_image_ctx.cct, 6) << "image cache cleaned" << dendl; | |
616 | Context *next_ctx = override_ctx(r, ctx); | |
617 | bool periodic_stats_enabled = m_periodic_stats_enabled; | |
618 | m_periodic_stats_enabled = false; | |
619 | ||
620 | if (periodic_stats_enabled) { | |
621 | /* Log stats one last time if they were enabled */ | |
622 | periodic_stats(); | |
623 | } | |
624 | { | |
625 | std::lock_guard locker(m_lock); | |
a4b75251 | 626 | check_image_cache_state_clean(); |
f67539c2 TL |
627 | m_wake_up_enabled = false; |
628 | m_cache_state->clean = true; | |
629 | m_log_entries.clear(); | |
630 | ||
631 | remove_pool_file(); | |
632 | ||
633 | if (m_perfcounter) { | |
634 | perf_stop(); | |
635 | } | |
636 | } | |
637 | update_image_cache_state(next_ctx); | |
638 | }); | |
a4b75251 TL |
639 | ctx = new LambdaContext( |
640 | [this, ctx](int r) { | |
641 | Context *next_ctx = override_ctx(r, ctx); | |
642 | ldout(m_image_ctx.cct, 6) << "waiting for in flight operations" << dendl; | |
643 | // Wait for in progress IOs to complete | |
644 | next_ctx = util::create_async_context_callback(&m_work_queue, next_ctx); | |
645 | m_async_op_tracker.wait_for_ops(next_ctx); | |
646 | }); | |
f67539c2 TL |
647 | ctx = new LambdaContext( |
648 | [this, ctx](int r) { | |
649 | Context *next_ctx = override_ctx(r, ctx); | |
650 | { | |
651 | /* Sync with process_writeback_dirty_entries() */ | |
652 | RWLock::WLocker entry_reader_wlocker(m_entry_reader_lock); | |
653 | m_shutting_down = true; | |
654 | /* Flush all writes to OSDs (unless disabled) and wait for all | |
655 | in-progress flush writes to complete */ | |
656 | ldout(m_image_ctx.cct, 6) << "flushing" << dendl; | |
657 | if (m_periodic_stats_enabled) { | |
658 | periodic_stats(); | |
659 | } | |
660 | } | |
661 | flush_dirty_entries(next_ctx); | |
662 | }); | |
f67539c2 TL |
663 | ctx = new LambdaContext( |
664 | [this, ctx](int r) { | |
665 | ldout(m_image_ctx.cct, 6) << "Done internal_flush in shutdown" << dendl; | |
666 | m_work_queue.queue(ctx, r); | |
667 | }); | |
668 | /* Complete all in-flight writes before shutting down */ | |
669 | ldout(m_image_ctx.cct, 6) << "internal_flush in shutdown" << dendl; | |
670 | internal_flush(false, ctx); | |
671 | } | |
672 | ||
673 | template <typename I> | |
674 | void AbstractWriteLog<I>::read(Extents&& image_extents, | |
675 | ceph::bufferlist* bl, | |
676 | int fadvise_flags, Context *on_finish) { | |
677 | CephContext *cct = m_image_ctx.cct; | |
678 | utime_t now = ceph_clock_now(); | |
679 | ||
680 | on_finish = new LambdaContext( | |
681 | [this, on_finish](int r) { | |
682 | m_async_op_tracker.finish_op(); | |
683 | on_finish->complete(r); | |
684 | }); | |
685 | C_ReadRequest *read_ctx = m_builder->create_read_request( | |
686 | cct, now, m_perfcounter, bl, on_finish); | |
687 | ldout(cct, 20) << "name: " << m_image_ctx.name << " id: " << m_image_ctx.id | |
688 | << "image_extents=" << image_extents << ", " | |
689 | << "bl=" << bl << ", " | |
690 | << "on_finish=" << on_finish << dendl; | |
691 | ||
692 | ceph_assert(m_initialized); | |
693 | bl->clear(); | |
694 | m_perfcounter->inc(l_librbd_pwl_rd_req, 1); | |
695 | ||
a4b75251 | 696 | std::vector<std::shared_ptr<GenericWriteLogEntry>> log_entries_to_read; |
f67539c2 TL |
697 | std::vector<bufferlist*> bls_to_read; |
698 | ||
699 | m_async_op_tracker.start_op(); | |
700 | Context *ctx = new LambdaContext( | |
701 | [this, read_ctx, fadvise_flags](int r) { | |
702 | if (read_ctx->miss_extents.empty()) { | |
703 | /* All of this read comes from RWL */ | |
704 | read_ctx->complete(0); | |
705 | } else { | |
706 | /* Pass the read misses on to the layer below RWL */ | |
707 | m_image_writeback.aio_read( | |
708 | std::move(read_ctx->miss_extents), &read_ctx->miss_bl, | |
709 | fadvise_flags, read_ctx); | |
710 | } | |
711 | }); | |
712 | ||
713 | /* | |
714 | * The strategy here is to look up all the WriteLogMapEntries that overlap | |
715 | * this read, and iterate through those to separate this read into hits and | |
716 | * misses. A new Extents object is produced here with Extents for each miss | |
717 | * region. The miss Extents is then passed on to the read cache below RWL. We | |
718 | * also produce an ImageExtentBufs for all the extents (hit or miss) in this | |
719 | * read. When the read from the lower cache layer completes, we iterate | |
720 | * through the ImageExtentBufs and insert buffers for each cache hit at the | |
721 | * appropriate spot in the bufferlist returned from below for the miss | |
722 | * read. The buffers we insert here refer directly to regions of various | |
723 | * write log entry data buffers. | |
724 | * | |
725 | * Locking: These buffer objects hold a reference on the write log entries | |
726 | * they refer to. Log entries can't be retired until there are no references. | |
727 | * The GenericWriteLogEntry references are released by the buffer destructor. | |
728 | */ | |
729 | for (auto &extent : image_extents) { | |
730 | uint64_t extent_offset = 0; | |
731 | RWLock::RLocker entry_reader_locker(m_entry_reader_lock); | |
732 | WriteLogMapEntries map_entries = m_blocks_to_log_entries.find_map_entries( | |
733 | block_extent(extent)); | |
734 | for (auto &map_entry : map_entries) { | |
735 | Extent entry_image_extent(pwl::image_extent(map_entry.block_extent)); | |
736 | /* If this map entry starts after the current image extent offset ... */ | |
737 | if (entry_image_extent.first > extent.first + extent_offset) { | |
738 | /* ... add range before map_entry to miss extents */ | |
739 | uint64_t miss_extent_start = extent.first + extent_offset; | |
740 | uint64_t miss_extent_length = entry_image_extent.first - | |
741 | miss_extent_start; | |
742 | Extent miss_extent(miss_extent_start, miss_extent_length); | |
743 | read_ctx->miss_extents.push_back(miss_extent); | |
744 | /* Add miss range to read extents */ | |
745 | auto miss_extent_buf = std::make_shared<ImageExtentBuf>(miss_extent); | |
746 | read_ctx->read_extents.push_back(miss_extent_buf); | |
747 | extent_offset += miss_extent_length; | |
748 | } | |
749 | ceph_assert(entry_image_extent.first <= extent.first + extent_offset); | |
750 | uint64_t entry_offset = 0; | |
751 | /* If this map entry starts before the current image extent offset ... */ | |
752 | if (entry_image_extent.first < extent.first + extent_offset) { | |
753 | /* ... compute offset into log entry for this read extent */ | |
754 | entry_offset = (extent.first + extent_offset) - entry_image_extent.first; | |
755 | } | |
756 | /* This read hit ends at the end of the extent or the end of the log | |
757 | entry, whichever is less. */ | |
758 | uint64_t entry_hit_length = min(entry_image_extent.second - entry_offset, | |
759 | extent.second - extent_offset); | |
760 | Extent hit_extent(entry_image_extent.first, entry_hit_length); | |
761 | if (0 == map_entry.log_entry->write_bytes() && | |
762 | 0 < map_entry.log_entry->bytes_dirty()) { | |
763 | /* discard log entry */ | |
764 | ldout(cct, 20) << "discard log entry" << dendl; | |
765 | auto discard_entry = map_entry.log_entry; | |
766 | ldout(cct, 20) << "read hit on discard entry: log_entry=" | |
767 | << *discard_entry | |
768 | << dendl; | |
769 | /* Discards read as zero, so we'll construct a bufferlist of zeros */ | |
770 | bufferlist zero_bl; | |
771 | zero_bl.append_zero(entry_hit_length); | |
772 | /* Add hit extent to read extents */ | |
773 | auto hit_extent_buf = std::make_shared<ImageExtentBuf>( | |
774 | hit_extent, zero_bl); | |
775 | read_ctx->read_extents.push_back(hit_extent_buf); | |
776 | } else { | |
777 | ldout(cct, 20) << "write or writesame log entry" << dendl; | |
778 | /* write and writesame log entry */ | |
779 | /* Offset of the map entry into the log entry's buffer */ | |
780 | uint64_t map_entry_buffer_offset = entry_image_extent.first - | |
781 | map_entry.log_entry->ram_entry.image_offset_bytes; | |
782 | /* Offset into the log entry buffer of this read hit */ | |
783 | uint64_t read_buffer_offset = map_entry_buffer_offset + entry_offset; | |
784 | /* Create buffer object referring to pmem pool for this read hit */ | |
785 | collect_read_extents( | |
786 | read_buffer_offset, map_entry, log_entries_to_read, bls_to_read, | |
787 | entry_hit_length, hit_extent, read_ctx); | |
788 | } | |
789 | /* Exclude RWL hit range from buffer and extent */ | |
790 | extent_offset += entry_hit_length; | |
791 | ldout(cct, 20) << map_entry << dendl; | |
792 | } | |
793 | /* If the last map entry didn't consume the entire image extent ... */ | |
794 | if (extent.second > extent_offset) { | |
795 | /* ... add the rest of this extent to miss extents */ | |
796 | uint64_t miss_extent_start = extent.first + extent_offset; | |
797 | uint64_t miss_extent_length = extent.second - extent_offset; | |
798 | Extent miss_extent(miss_extent_start, miss_extent_length); | |
799 | read_ctx->miss_extents.push_back(miss_extent); | |
800 | /* Add miss range to read extents */ | |
801 | auto miss_extent_buf = std::make_shared<ImageExtentBuf>(miss_extent); | |
802 | read_ctx->read_extents.push_back(miss_extent_buf); | |
803 | extent_offset += miss_extent_length; | |
804 | } | |
805 | } | |
806 | ||
807 | ldout(cct, 20) << "miss_extents=" << read_ctx->miss_extents << ", " | |
808 | << "miss_bl=" << read_ctx->miss_bl << dendl; | |
809 | ||
810 | complete_read(log_entries_to_read, bls_to_read, ctx); | |
811 | } | |
812 | ||
813 | template <typename I> | |
814 | void AbstractWriteLog<I>::write(Extents &&image_extents, | |
815 | bufferlist&& bl, | |
816 | int fadvise_flags, | |
817 | Context *on_finish) { | |
818 | CephContext *cct = m_image_ctx.cct; | |
819 | ||
820 | ldout(cct, 20) << "aio_write" << dendl; | |
821 | ||
822 | utime_t now = ceph_clock_now(); | |
823 | m_perfcounter->inc(l_librbd_pwl_wr_req, 1); | |
824 | ||
825 | ceph_assert(m_initialized); | |
826 | ||
827 | /* Split images because PMDK's space management is not perfect, there are | |
828 | * fragment problems. The larger the block size difference of the block, | |
829 | * the easier the fragmentation problem will occur, resulting in the | |
830 | * remaining space can not be allocated in large size. We plan to manage | |
831 | * pmem space and allocation by ourselves in the future. | |
832 | */ | |
833 | Extents split_image_extents; | |
834 | uint64_t max_extent_size = get_max_extent(); | |
835 | if (max_extent_size != 0) { | |
836 | for (auto extent : image_extents) { | |
837 | if (extent.second > max_extent_size) { | |
838 | uint64_t off = extent.first; | |
839 | uint64_t extent_bytes = extent.second; | |
840 | for (int i = 0; extent_bytes != 0; ++i) { | |
841 | Extent _ext; | |
842 | _ext.first = off + i * max_extent_size; | |
843 | _ext.second = std::min(max_extent_size, extent_bytes); | |
844 | extent_bytes = extent_bytes - _ext.second ; | |
845 | split_image_extents.emplace_back(_ext); | |
846 | } | |
847 | } else { | |
848 | split_image_extents.emplace_back(extent); | |
849 | } | |
850 | } | |
851 | } else { | |
852 | split_image_extents = image_extents; | |
853 | } | |
854 | ||
855 | C_WriteRequestT *write_req = | |
856 | m_builder->create_write_request(*this, now, std::move(split_image_extents), | |
857 | std::move(bl), fadvise_flags, m_lock, | |
858 | m_perfcounter, on_finish); | |
859 | m_perfcounter->inc(l_librbd_pwl_wr_bytes, | |
860 | write_req->image_extents_summary.total_bytes); | |
861 | ||
862 | /* The lambda below will be called when the block guard for all | |
863 | * blocks affected by this write is obtained */ | |
864 | GuardedRequestFunctionContext *guarded_ctx = | |
865 | new GuardedRequestFunctionContext([this, | |
866 | write_req](GuardedRequestFunctionContext &guard_ctx) { | |
867 | write_req->blockguard_acquired(guard_ctx); | |
868 | alloc_and_dispatch_io_req(write_req); | |
869 | }); | |
870 | ||
871 | detain_guarded_request(write_req, guarded_ctx, false); | |
872 | } | |
873 | ||
874 | template <typename I> | |
875 | void AbstractWriteLog<I>::discard(uint64_t offset, uint64_t length, | |
876 | uint32_t discard_granularity_bytes, | |
877 | Context *on_finish) { | |
878 | CephContext *cct = m_image_ctx.cct; | |
879 | ||
880 | ldout(cct, 20) << dendl; | |
881 | ||
882 | utime_t now = ceph_clock_now(); | |
883 | m_perfcounter->inc(l_librbd_pwl_discard, 1); | |
884 | Extents discard_extents = {{offset, length}}; | |
885 | m_discard_granularity_bytes = discard_granularity_bytes; | |
886 | ||
887 | ceph_assert(m_initialized); | |
888 | ||
889 | auto *discard_req = | |
890 | new C_DiscardRequestT(*this, now, std::move(discard_extents), discard_granularity_bytes, | |
891 | m_lock, m_perfcounter, on_finish); | |
892 | ||
893 | /* The lambda below will be called when the block guard for all | |
894 | * blocks affected by this write is obtained */ | |
895 | GuardedRequestFunctionContext *guarded_ctx = | |
896 | new GuardedRequestFunctionContext([this, discard_req](GuardedRequestFunctionContext &guard_ctx) { | |
897 | discard_req->blockguard_acquired(guard_ctx); | |
898 | alloc_and_dispatch_io_req(discard_req); | |
899 | }); | |
900 | ||
901 | detain_guarded_request(discard_req, guarded_ctx, false); | |
902 | } | |
903 | ||
904 | /** | |
905 | * Aio_flush completes when all previously completed writes are | |
906 | * flushed to persistent cache. We make a best-effort attempt to also | |
907 | * defer until all in-progress writes complete, but we may not know | |
908 | * about all of the writes the application considers in-progress yet, | |
909 | * due to uncertainty in the IO submission workq (multiple WQ threads | |
910 | * may allow out-of-order submission). | |
911 | * | |
912 | * This flush operation will not wait for writes deferred for overlap | |
913 | * in the block guard. | |
914 | */ | |
915 | template <typename I> | |
916 | void AbstractWriteLog<I>::flush(io::FlushSource flush_source, Context *on_finish) { | |
917 | CephContext *cct = m_image_ctx.cct; | |
918 | ldout(cct, 20) << "on_finish=" << on_finish << " flush_source=" << flush_source << dendl; | |
919 | ||
920 | if (io::FLUSH_SOURCE_SHUTDOWN == flush_source || io::FLUSH_SOURCE_INTERNAL == flush_source || | |
921 | io::FLUSH_SOURCE_WRITE_BLOCK == flush_source) { | |
922 | internal_flush(false, on_finish); | |
923 | return; | |
924 | } | |
925 | m_perfcounter->inc(l_librbd_pwl_aio_flush, 1); | |
926 | ||
927 | /* May be called even if initialization fails */ | |
928 | if (!m_initialized) { | |
929 | ldout(cct, 05) << "never initialized" << dendl; | |
930 | /* Deadlock if completed here */ | |
931 | m_image_ctx.op_work_queue->queue(on_finish, 0); | |
932 | return; | |
933 | } | |
934 | ||
935 | { | |
936 | std::shared_lock image_locker(m_image_ctx.image_lock); | |
937 | if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) { | |
938 | on_finish->complete(-EROFS); | |
939 | return; | |
940 | } | |
941 | } | |
942 | ||
943 | auto flush_req = make_flush_req(on_finish); | |
944 | ||
945 | GuardedRequestFunctionContext *guarded_ctx = | |
946 | new GuardedRequestFunctionContext([this, flush_req](GuardedRequestFunctionContext &guard_ctx) { | |
947 | ldout(m_image_ctx.cct, 20) << "flush_req=" << flush_req << " cell=" << guard_ctx.cell << dendl; | |
948 | ceph_assert(guard_ctx.cell); | |
949 | flush_req->detained = guard_ctx.state.detained; | |
950 | /* We don't call flush_req->set_cell(), because the block guard will be released here */ | |
951 | { | |
952 | DeferredContexts post_unlock; /* Do these when the lock below is released */ | |
953 | std::lock_guard locker(m_lock); | |
954 | ||
955 | if (!m_persist_on_flush && m_persist_on_write_until_flush) { | |
956 | m_persist_on_flush = true; | |
957 | ldout(m_image_ctx.cct, 5) << "now persisting on flush" << dendl; | |
958 | } | |
959 | ||
960 | /* | |
961 | * Create a new sync point if there have been writes since the last | |
962 | * one. | |
963 | * | |
964 | * We do not flush the caches below the RWL here. | |
965 | */ | |
966 | flush_new_sync_point_if_needed(flush_req, post_unlock); | |
967 | } | |
968 | ||
969 | release_guarded_request(guard_ctx.cell); | |
970 | }); | |
971 | ||
972 | detain_guarded_request(flush_req, guarded_ctx, true); | |
973 | } | |
974 | ||
975 | template <typename I> | |
976 | void AbstractWriteLog<I>::writesame(uint64_t offset, uint64_t length, | |
977 | bufferlist&& bl, int fadvise_flags, | |
978 | Context *on_finish) { | |
979 | CephContext *cct = m_image_ctx.cct; | |
980 | ||
981 | ldout(cct, 20) << "aio_writesame" << dendl; | |
982 | ||
983 | utime_t now = ceph_clock_now(); | |
984 | Extents ws_extents = {{offset, length}}; | |
985 | m_perfcounter->inc(l_librbd_pwl_ws, 1); | |
986 | ceph_assert(m_initialized); | |
987 | ||
988 | /* A write same request is also a write request. The key difference is the | |
989 | * write same data buffer is shorter than the extent of the request. The full | |
990 | * extent will be used in the block guard, and appear in | |
991 | * m_blocks_to_log_entries_map. The data buffer allocated for the WS is only | |
992 | * as long as the length of the bl here, which is the pattern that's repeated | |
993 | * in the image for the entire length of this WS. Read hits and flushing of | |
994 | * write sames are different than normal writes. */ | |
995 | C_WriteSameRequestT *ws_req = | |
996 | m_builder->create_writesame_request(*this, now, std::move(ws_extents), std::move(bl), | |
997 | fadvise_flags, m_lock, m_perfcounter, on_finish); | |
998 | m_perfcounter->inc(l_librbd_pwl_ws_bytes, ws_req->image_extents_summary.total_bytes); | |
999 | ||
1000 | /* The lambda below will be called when the block guard for all | |
1001 | * blocks affected by this write is obtained */ | |
1002 | GuardedRequestFunctionContext *guarded_ctx = | |
1003 | new GuardedRequestFunctionContext([this, ws_req](GuardedRequestFunctionContext &guard_ctx) { | |
1004 | ws_req->blockguard_acquired(guard_ctx); | |
1005 | alloc_and_dispatch_io_req(ws_req); | |
1006 | }); | |
1007 | ||
1008 | detain_guarded_request(ws_req, guarded_ctx, false); | |
1009 | } | |
1010 | ||
1011 | template <typename I> | |
1012 | void AbstractWriteLog<I>::compare_and_write(Extents &&image_extents, | |
1013 | bufferlist&& cmp_bl, | |
1014 | bufferlist&& bl, | |
1015 | uint64_t *mismatch_offset, | |
1016 | int fadvise_flags, | |
1017 | Context *on_finish) { | |
1018 | ldout(m_image_ctx.cct, 20) << dendl; | |
1019 | ||
1020 | utime_t now = ceph_clock_now(); | |
1021 | m_perfcounter->inc(l_librbd_pwl_cmp, 1); | |
1022 | ceph_assert(m_initialized); | |
1023 | ||
1024 | /* A compare and write request is also a write request. We only allocate | |
1025 | * resources and dispatch this write request if the compare phase | |
1026 | * succeeds. */ | |
1027 | C_WriteRequestT *cw_req = | |
1028 | m_builder->create_comp_and_write_request( | |
1029 | *this, now, std::move(image_extents), std::move(cmp_bl), std::move(bl), | |
1030 | mismatch_offset, fadvise_flags, m_lock, m_perfcounter, on_finish); | |
1031 | m_perfcounter->inc(l_librbd_pwl_cmp_bytes, cw_req->image_extents_summary.total_bytes); | |
1032 | ||
1033 | /* The lambda below will be called when the block guard for all | |
1034 | * blocks affected by this write is obtained */ | |
1035 | GuardedRequestFunctionContext *guarded_ctx = | |
1036 | new GuardedRequestFunctionContext([this, cw_req](GuardedRequestFunctionContext &guard_ctx) { | |
1037 | cw_req->blockguard_acquired(guard_ctx); | |
1038 | ||
1039 | auto read_complete_ctx = new LambdaContext( | |
1040 | [this, cw_req](int r) { | |
1041 | ldout(m_image_ctx.cct, 20) << "name: " << m_image_ctx.name << " id: " << m_image_ctx.id | |
1042 | << "cw_req=" << cw_req << dendl; | |
1043 | ||
1044 | /* Compare read_bl to cmp_bl to determine if this will produce a write */ | |
1045 | buffer::list aligned_read_bl; | |
1046 | if (cw_req->cmp_bl.length() < cw_req->read_bl.length()) { | |
1047 | aligned_read_bl.substr_of(cw_req->read_bl, 0, cw_req->cmp_bl.length()); | |
1048 | } | |
1049 | if (cw_req->cmp_bl.contents_equal(cw_req->read_bl) || | |
1050 | cw_req->cmp_bl.contents_equal(aligned_read_bl)) { | |
1051 | /* Compare phase succeeds. Begin write */ | |
1052 | ldout(m_image_ctx.cct, 5) << " cw_req=" << cw_req << " compare matched" << dendl; | |
1053 | cw_req->compare_succeeded = true; | |
1054 | *cw_req->mismatch_offset = 0; | |
1055 | /* Continue with this request as a write. Blockguard release and | |
1056 | * user request completion handled as if this were a plain | |
1057 | * write. */ | |
1058 | alloc_and_dispatch_io_req(cw_req); | |
1059 | } else { | |
1060 | /* Compare phase fails. Comp-and write ends now. */ | |
1061 | ldout(m_image_ctx.cct, 15) << " cw_req=" << cw_req << " compare failed" << dendl; | |
1062 | /* Bufferlist doesn't tell us where they differed, so we'll have to determine that here */ | |
1063 | uint64_t bl_index = 0; | |
1064 | for (bl_index = 0; bl_index < cw_req->cmp_bl.length(); bl_index++) { | |
1065 | if (cw_req->cmp_bl[bl_index] != cw_req->read_bl[bl_index]) { | |
1066 | ldout(m_image_ctx.cct, 15) << " cw_req=" << cw_req << " mismatch at " << bl_index << dendl; | |
1067 | break; | |
1068 | } | |
1069 | } | |
1070 | cw_req->compare_succeeded = false; | |
1071 | *cw_req->mismatch_offset = bl_index; | |
1072 | cw_req->complete_user_request(-EILSEQ); | |
1073 | cw_req->release_cell(); | |
1074 | cw_req->complete(0); | |
1075 | } | |
1076 | }); | |
1077 | ||
1078 | /* Read phase of comp-and-write must read through RWL */ | |
1079 | Extents image_extents_copy = cw_req->image_extents; | |
1080 | read(std::move(image_extents_copy), &cw_req->read_bl, cw_req->fadvise_flags, read_complete_ctx); | |
1081 | }); | |
1082 | ||
1083 | detain_guarded_request(cw_req, guarded_ctx, false); | |
1084 | } | |
1085 | ||
1086 | template <typename I> | |
1087 | void AbstractWriteLog<I>::flush(Context *on_finish) { | |
1088 | internal_flush(false, on_finish); | |
1089 | } | |
1090 | ||
1091 | template <typename I> | |
1092 | void AbstractWriteLog<I>::invalidate(Context *on_finish) { | |
1093 | internal_flush(true, on_finish); | |
1094 | } | |
1095 | ||
1096 | template <typename I> | |
1097 | CephContext *AbstractWriteLog<I>::get_context() { | |
1098 | return m_image_ctx.cct; | |
1099 | } | |
1100 | ||
1101 | template <typename I> | |
1102 | BlockGuardCell* AbstractWriteLog<I>::detain_guarded_request_helper(GuardedRequest &req) | |
1103 | { | |
1104 | CephContext *cct = m_image_ctx.cct; | |
1105 | BlockGuardCell *cell; | |
1106 | ||
1107 | ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock)); | |
1108 | ldout(cct, 20) << dendl; | |
1109 | ||
1110 | int r = m_write_log_guard.detain(req.block_extent, &req, &cell); | |
1111 | ceph_assert(r>=0); | |
1112 | if (r > 0) { | |
1113 | ldout(cct, 20) << "detaining guarded request due to in-flight requests: " | |
1114 | << "req=" << req << dendl; | |
1115 | return nullptr; | |
1116 | } | |
1117 | ||
1118 | ldout(cct, 20) << "in-flight request cell: " << cell << dendl; | |
1119 | return cell; | |
1120 | } | |
1121 | ||
1122 | template <typename I> | |
1123 | BlockGuardCell* AbstractWriteLog<I>::detain_guarded_request_barrier_helper( | |
1124 | GuardedRequest &req) | |
1125 | { | |
1126 | BlockGuardCell *cell = nullptr; | |
1127 | ||
1128 | ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock)); | |
1129 | ldout(m_image_ctx.cct, 20) << dendl; | |
1130 | ||
1131 | if (m_barrier_in_progress) { | |
1132 | req.guard_ctx->state.queued = true; | |
1133 | m_awaiting_barrier.push_back(req); | |
1134 | } else { | |
1135 | bool barrier = req.guard_ctx->state.barrier; | |
1136 | if (barrier) { | |
1137 | m_barrier_in_progress = true; | |
1138 | req.guard_ctx->state.current_barrier = true; | |
1139 | } | |
1140 | cell = detain_guarded_request_helper(req); | |
1141 | if (barrier) { | |
1142 | /* Only non-null if the barrier acquires the guard now */ | |
1143 | m_barrier_cell = cell; | |
1144 | } | |
1145 | } | |
1146 | ||
1147 | return cell; | |
1148 | } | |
1149 | ||
1150 | template <typename I> | |
1151 | void AbstractWriteLog<I>::detain_guarded_request( | |
1152 | C_BlockIORequestT *request, | |
1153 | GuardedRequestFunctionContext *guarded_ctx, | |
1154 | bool is_barrier) | |
1155 | { | |
1156 | BlockExtent extent; | |
1157 | if (request) { | |
1158 | extent = request->image_extents_summary.block_extent(); | |
1159 | } else { | |
1160 | extent = block_extent(whole_volume_extent()); | |
1161 | } | |
1162 | auto req = GuardedRequest(extent, guarded_ctx, is_barrier); | |
1163 | BlockGuardCell *cell = nullptr; | |
1164 | ||
1165 | ldout(m_image_ctx.cct, 20) << dendl; | |
1166 | { | |
1167 | std::lock_guard locker(m_blockguard_lock); | |
1168 | cell = detain_guarded_request_barrier_helper(req); | |
1169 | } | |
1170 | if (cell) { | |
1171 | req.guard_ctx->cell = cell; | |
1172 | req.guard_ctx->complete(0); | |
1173 | } | |
1174 | } | |
1175 | ||
1176 | template <typename I> | |
1177 | void AbstractWriteLog<I>::release_guarded_request(BlockGuardCell *released_cell) | |
1178 | { | |
1179 | CephContext *cct = m_image_ctx.cct; | |
1180 | WriteLogGuard::BlockOperations block_reqs; | |
1181 | ldout(cct, 20) << "released_cell=" << released_cell << dendl; | |
1182 | ||
1183 | { | |
1184 | std::lock_guard locker(m_blockguard_lock); | |
1185 | m_write_log_guard.release(released_cell, &block_reqs); | |
1186 | ||
1187 | for (auto &req : block_reqs) { | |
1188 | req.guard_ctx->state.detained = true; | |
1189 | BlockGuardCell *detained_cell = detain_guarded_request_helper(req); | |
1190 | if (detained_cell) { | |
1191 | if (req.guard_ctx->state.current_barrier) { | |
1192 | /* The current barrier is acquiring the block guard, so now we know its cell */ | |
1193 | m_barrier_cell = detained_cell; | |
1194 | /* detained_cell could be == released_cell here */ | |
1195 | ldout(cct, 20) << "current barrier cell=" << detained_cell << " req=" << req << dendl; | |
1196 | } | |
1197 | req.guard_ctx->cell = detained_cell; | |
1198 | m_work_queue.queue(req.guard_ctx); | |
1199 | } | |
1200 | } | |
1201 | ||
1202 | if (m_barrier_in_progress && (released_cell == m_barrier_cell)) { | |
1203 | ldout(cct, 20) << "current barrier released cell=" << released_cell << dendl; | |
1204 | /* The released cell is the current barrier request */ | |
1205 | m_barrier_in_progress = false; | |
1206 | m_barrier_cell = nullptr; | |
1207 | /* Move waiting requests into the blockguard. Stop if there's another barrier */ | |
1208 | while (!m_barrier_in_progress && !m_awaiting_barrier.empty()) { | |
1209 | auto &req = m_awaiting_barrier.front(); | |
1210 | ldout(cct, 20) << "submitting queued request to blockguard: " << req << dendl; | |
1211 | BlockGuardCell *detained_cell = detain_guarded_request_barrier_helper(req); | |
1212 | if (detained_cell) { | |
1213 | req.guard_ctx->cell = detained_cell; | |
1214 | m_work_queue.queue(req.guard_ctx); | |
1215 | } | |
1216 | m_awaiting_barrier.pop_front(); | |
1217 | } | |
1218 | } | |
1219 | } | |
1220 | ||
1221 | ldout(cct, 20) << "exit" << dendl; | |
1222 | } | |
1223 | ||
1224 | template <typename I> | |
1225 | void AbstractWriteLog<I>::append_scheduled(GenericLogOperations &ops, bool &ops_remain, | |
1226 | bool &appending, bool isRWL) | |
1227 | { | |
1228 | const unsigned long int OPS_APPENDED = isRWL ? MAX_ALLOC_PER_TRANSACTION | |
1229 | : MAX_WRITES_PER_SYNC_POINT; | |
1230 | { | |
1231 | std::lock_guard locker(m_lock); | |
1232 | if (!appending && m_appending) { | |
1233 | /* Another thread is appending */ | |
1234 | ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl; | |
1235 | return; | |
1236 | } | |
1237 | if (m_ops_to_append.size()) { | |
1238 | appending = true; | |
1239 | m_appending = true; | |
1240 | auto last_in_batch = m_ops_to_append.begin(); | |
1241 | unsigned int ops_to_append = m_ops_to_append.size(); | |
1242 | if (ops_to_append > OPS_APPENDED) { | |
1243 | ops_to_append = OPS_APPENDED; | |
1244 | } | |
1245 | std::advance(last_in_batch, ops_to_append); | |
1246 | ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch); | |
1247 | ops_remain = true; /* Always check again before leaving */ | |
1248 | ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", " | |
1249 | << m_ops_to_append.size() << " remain" << dendl; | |
1250 | } else if (isRWL) { | |
1251 | ops_remain = false; | |
1252 | if (appending) { | |
1253 | appending = false; | |
1254 | m_appending = false; | |
1255 | } | |
1256 | } | |
1257 | } | |
1258 | } | |
1259 | ||
1260 | template <typename I> | |
a4b75251 | 1261 | void AbstractWriteLog<I>::schedule_append(GenericLogOperationsVector &ops, C_BlockIORequestT *req) |
f67539c2 TL |
1262 | { |
1263 | GenericLogOperations to_append(ops.begin(), ops.end()); | |
1264 | ||
a4b75251 | 1265 | schedule_append_ops(to_append, req); |
f67539c2 TL |
1266 | } |
1267 | ||
1268 | template <typename I> | |
a4b75251 | 1269 | void AbstractWriteLog<I>::schedule_append(GenericLogOperationSharedPtr op, C_BlockIORequestT *req) |
f67539c2 TL |
1270 | { |
1271 | GenericLogOperations to_append { op }; | |
1272 | ||
a4b75251 | 1273 | schedule_append_ops(to_append, req); |
f67539c2 TL |
1274 | } |
1275 | ||
1276 | /* | |
1277 | * Complete a set of write ops with the result of append_op_entries. | |
1278 | */ | |
1279 | template <typename I> | |
1280 | void AbstractWriteLog<I>::complete_op_log_entries(GenericLogOperations &&ops, | |
1281 | const int result) | |
1282 | { | |
1283 | GenericLogEntries dirty_entries; | |
1284 | int published_reserves = 0; | |
1285 | ldout(m_image_ctx.cct, 20) << __func__ << ": completing" << dendl; | |
1286 | for (auto &op : ops) { | |
1287 | utime_t now = ceph_clock_now(); | |
1288 | auto log_entry = op->get_log_entry(); | |
1289 | log_entry->completed = true; | |
1290 | if (op->is_writing_op()) { | |
1291 | op->mark_log_entry_completed(); | |
1292 | dirty_entries.push_back(log_entry); | |
1293 | } | |
1294 | if (log_entry->is_write_entry()) { | |
1295 | release_ram(log_entry); | |
1296 | } | |
1297 | if (op->reserved_allocated()) { | |
1298 | published_reserves++; | |
1299 | } | |
1300 | { | |
1301 | std::lock_guard locker(m_lock); | |
1302 | m_unpublished_reserves -= published_reserves; | |
1303 | m_dirty_log_entries.splice(m_dirty_log_entries.end(), dirty_entries); | |
1304 | } | |
1305 | op->complete(result); | |
1306 | m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_app_t, | |
a4b75251 | 1307 | op->log_append_start_time - op->dispatch_time); |
f67539c2 TL |
1308 | m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_cmp_t, now - op->dispatch_time); |
1309 | m_perfcounter->hinc(l_librbd_pwl_log_op_dis_to_cmp_t_hist, | |
1310 | utime_t(now - op->dispatch_time).to_nsec(), | |
1311 | log_entry->ram_entry.write_bytes); | |
a4b75251 | 1312 | utime_t app_lat = op->log_append_comp_time - op->log_append_start_time; |
f67539c2 TL |
1313 | m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_appc_t, app_lat); |
1314 | m_perfcounter->hinc(l_librbd_pwl_log_op_app_to_appc_t_hist, app_lat.to_nsec(), | |
1315 | log_entry->ram_entry.write_bytes); | |
a4b75251 | 1316 | m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_cmp_t, now - op->log_append_start_time); |
f67539c2 TL |
1317 | } |
1318 | // New entries may be flushable | |
1319 | { | |
1320 | std::lock_guard locker(m_lock); | |
1321 | wake_up(); | |
1322 | } | |
1323 | } | |
1324 | ||
1325 | /** | |
1326 | * Dispatch as many deferred writes as possible | |
1327 | */ | |
1328 | template <typename I> | |
1329 | void AbstractWriteLog<I>::dispatch_deferred_writes(void) | |
1330 | { | |
1331 | C_BlockIORequestT *front_req = nullptr; /* req still on front of deferred list */ | |
1332 | C_BlockIORequestT *allocated_req = nullptr; /* req that was allocated, and is now off the list */ | |
1333 | bool allocated = false; /* front_req allocate succeeded */ | |
1334 | bool cleared_dispatching_flag = false; | |
1335 | ||
1336 | /* If we can't become the dispatcher, we'll exit */ | |
1337 | { | |
1338 | std::lock_guard locker(m_lock); | |
1339 | if (m_dispatching_deferred_ops || | |
1340 | !m_deferred_ios.size()) { | |
1341 | return; | |
1342 | } | |
1343 | m_dispatching_deferred_ops = true; | |
1344 | } | |
1345 | ||
1346 | /* There are ops to dispatch, and this should be the only thread dispatching them */ | |
1347 | { | |
1348 | std::lock_guard deferred_dispatch(m_deferred_dispatch_lock); | |
1349 | do { | |
1350 | { | |
1351 | std::lock_guard locker(m_lock); | |
1352 | ceph_assert(m_dispatching_deferred_ops); | |
1353 | if (allocated) { | |
1354 | /* On the 2..n-1 th time we get lock, front_req->alloc_resources() will | |
1355 | * have succeeded, and we'll need to pop it off the deferred ops list | |
1356 | * here. */ | |
1357 | ceph_assert(front_req); | |
1358 | ceph_assert(!allocated_req); | |
1359 | m_deferred_ios.pop_front(); | |
1360 | allocated_req = front_req; | |
1361 | front_req = nullptr; | |
1362 | allocated = false; | |
1363 | } | |
1364 | ceph_assert(!allocated); | |
1365 | if (!allocated && front_req) { | |
1366 | /* front_req->alloc_resources() failed on the last iteration. | |
1367 | * We'll stop dispatching. */ | |
1368 | wake_up(); | |
1369 | front_req = nullptr; | |
1370 | ceph_assert(!cleared_dispatching_flag); | |
1371 | m_dispatching_deferred_ops = false; | |
1372 | cleared_dispatching_flag = true; | |
1373 | } else { | |
1374 | ceph_assert(!front_req); | |
1375 | if (m_deferred_ios.size()) { | |
1376 | /* New allocation candidate */ | |
1377 | front_req = m_deferred_ios.front(); | |
1378 | } else { | |
1379 | ceph_assert(!cleared_dispatching_flag); | |
1380 | m_dispatching_deferred_ops = false; | |
1381 | cleared_dispatching_flag = true; | |
1382 | } | |
1383 | } | |
1384 | } | |
1385 | /* Try allocating for front_req before we decide what to do with allocated_req | |
1386 | * (if any) */ | |
1387 | if (front_req) { | |
1388 | ceph_assert(!cleared_dispatching_flag); | |
1389 | allocated = front_req->alloc_resources(); | |
1390 | } | |
1391 | if (allocated_req && front_req && allocated) { | |
1392 | /* Push dispatch of the first allocated req to a wq */ | |
1393 | m_work_queue.queue(new LambdaContext( | |
1394 | [this, allocated_req](int r) { | |
1395 | allocated_req->dispatch(); | |
1396 | }), 0); | |
1397 | allocated_req = nullptr; | |
1398 | } | |
1399 | ceph_assert(!(allocated_req && front_req && allocated)); | |
1400 | ||
1401 | /* Continue while we're still considering the front of the deferred ops list */ | |
1402 | } while (front_req); | |
1403 | ceph_assert(!allocated); | |
1404 | } | |
1405 | ceph_assert(cleared_dispatching_flag); | |
1406 | ||
1407 | /* If any deferred requests were allocated, the last one will still be in allocated_req */ | |
1408 | if (allocated_req) { | |
1409 | allocated_req->dispatch(); | |
1410 | } | |
1411 | } | |
1412 | ||
1413 | /** | |
1414 | * Returns the lanes used by this write, and attempts to dispatch the next | |
1415 | * deferred write | |
1416 | */ | |
1417 | template <typename I> | |
1418 | void AbstractWriteLog<I>::release_write_lanes(C_BlockIORequestT *req) | |
1419 | { | |
1420 | { | |
1421 | std::lock_guard locker(m_lock); | |
1422 | m_free_lanes += req->image_extents.size(); | |
1423 | } | |
1424 | dispatch_deferred_writes(); | |
1425 | } | |
1426 | ||
1427 | /** | |
1428 | * Attempts to allocate log resources for a write. Write is dispatched if | |
1429 | * resources are available, or queued if they aren't. | |
1430 | */ | |
1431 | template <typename I> | |
1432 | void AbstractWriteLog<I>::alloc_and_dispatch_io_req(C_BlockIORequestT *req) | |
1433 | { | |
1434 | bool dispatch_here = false; | |
1435 | ||
1436 | { | |
1437 | /* If there are already deferred writes, queue behind them for resources */ | |
1438 | { | |
1439 | std::lock_guard locker(m_lock); | |
1440 | dispatch_here = m_deferred_ios.empty(); | |
1441 | // Only flush req's total_bytes is the max uint64 | |
a4b75251 TL |
1442 | if (req->image_extents_summary.total_bytes == |
1443 | std::numeric_limits<uint64_t>::max() && | |
1444 | static_cast<C_FlushRequestT *>(req)->internal == true) { | |
f67539c2 TL |
1445 | dispatch_here = true; |
1446 | } | |
1447 | } | |
1448 | if (dispatch_here) { | |
1449 | dispatch_here = req->alloc_resources(); | |
1450 | } | |
1451 | if (dispatch_here) { | |
1452 | ldout(m_image_ctx.cct, 20) << "dispatching" << dendl; | |
1453 | req->dispatch(); | |
1454 | } else { | |
1455 | req->deferred(); | |
1456 | { | |
1457 | std::lock_guard locker(m_lock); | |
1458 | m_deferred_ios.push_back(req); | |
1459 | } | |
1460 | ldout(m_image_ctx.cct, 20) << "deferred IOs: " << m_deferred_ios.size() << dendl; | |
1461 | dispatch_deferred_writes(); | |
1462 | } | |
1463 | } | |
1464 | } | |
1465 | ||
1466 | template <typename I> | |
a4b75251 TL |
1467 | bool AbstractWriteLog<I>::check_allocation( |
1468 | C_BlockIORequestT *req, uint64_t bytes_cached, uint64_t bytes_dirtied, | |
1469 | uint64_t bytes_allocated, uint32_t num_lanes, uint32_t num_log_entries, | |
1470 | uint32_t num_unpublished_reserves) { | |
f67539c2 TL |
1471 | bool alloc_succeeds = true; |
1472 | bool no_space = false; | |
1473 | { | |
1474 | std::lock_guard locker(m_lock); | |
1475 | if (m_free_lanes < num_lanes) { | |
1476 | req->set_io_waited_for_lanes(true); | |
1477 | ldout(m_image_ctx.cct, 20) << "not enough free lanes (need " | |
1478 | << num_lanes | |
1479 | << ", have " << m_free_lanes << ") " | |
1480 | << *req << dendl; | |
1481 | alloc_succeeds = false; | |
1482 | /* This isn't considered a "no space" alloc fail. Lanes are a throttling mechanism. */ | |
1483 | } | |
1484 | if (m_free_log_entries < num_log_entries) { | |
1485 | req->set_io_waited_for_entries(true); | |
1486 | ldout(m_image_ctx.cct, 20) << "not enough free entries (need " | |
1487 | << num_log_entries | |
1488 | << ", have " << m_free_log_entries << ") " | |
1489 | << *req << dendl; | |
1490 | alloc_succeeds = false; | |
1491 | no_space = true; /* Entries must be retired */ | |
1492 | } | |
1493 | /* Don't attempt buffer allocate if we've exceeded the "full" threshold */ | |
a4b75251 | 1494 | if (m_bytes_allocated + bytes_allocated > m_bytes_allocated_cap) { |
f67539c2 TL |
1495 | if (!req->has_io_waited_for_buffers()) { |
1496 | req->set_io_waited_for_buffers(true); | |
a4b75251 TL |
1497 | ldout(m_image_ctx.cct, 5) << "Waiting for allocation cap (cap=" |
1498 | << m_bytes_allocated_cap | |
f67539c2 TL |
1499 | << ", allocated=" << m_bytes_allocated |
1500 | << ") in write [" << *req << "]" << dendl; | |
1501 | } | |
1502 | alloc_succeeds = false; | |
1503 | no_space = true; /* Entries must be retired */ | |
1504 | } | |
1505 | } | |
1506 | ||
1507 | if (alloc_succeeds) { | |
1508 | reserve_cache(req, alloc_succeeds, no_space); | |
1509 | } | |
1510 | ||
1511 | if (alloc_succeeds) { | |
1512 | std::lock_guard locker(m_lock); | |
1513 | /* We need one free log entry per extent (each is a separate entry), and | |
1514 | * one free "lane" for remote replication. */ | |
1515 | if ((m_free_lanes >= num_lanes) && | |
a4b75251 TL |
1516 | (m_free_log_entries >= num_log_entries) && |
1517 | (m_bytes_allocated_cap >= m_bytes_allocated + bytes_allocated)) { | |
f67539c2 TL |
1518 | m_free_lanes -= num_lanes; |
1519 | m_free_log_entries -= num_log_entries; | |
1520 | m_unpublished_reserves += num_unpublished_reserves; | |
1521 | m_bytes_allocated += bytes_allocated; | |
1522 | m_bytes_cached += bytes_cached; | |
1523 | m_bytes_dirty += bytes_dirtied; | |
1524 | if (req->has_io_waited_for_buffers()) { | |
1525 | req->set_io_waited_for_buffers(false); | |
1526 | } | |
f67539c2 TL |
1527 | } else { |
1528 | alloc_succeeds = false; | |
1529 | } | |
1530 | } | |
1531 | ||
1532 | if (!alloc_succeeds && no_space) { | |
1533 | /* Expedite flushing and/or retiring */ | |
1534 | std::lock_guard locker(m_lock); | |
1535 | m_alloc_failed_since_retire = true; | |
1536 | m_last_alloc_fail = ceph_clock_now(); | |
1537 | } | |
1538 | ||
1539 | return alloc_succeeds; | |
1540 | } | |
1541 | ||
1542 | template <typename I> | |
1543 | C_FlushRequest<AbstractWriteLog<I>>* AbstractWriteLog<I>::make_flush_req(Context *on_finish) { | |
1544 | utime_t flush_begins = ceph_clock_now(); | |
1545 | bufferlist bl; | |
1546 | auto *flush_req = | |
1547 | new C_FlushRequestT(*this, flush_begins, Extents({whole_volume_extent()}), | |
1548 | std::move(bl), 0, m_lock, m_perfcounter, on_finish); | |
1549 | ||
1550 | return flush_req; | |
1551 | } | |
1552 | ||
1553 | template <typename I> | |
1554 | void AbstractWriteLog<I>::wake_up() { | |
1555 | CephContext *cct = m_image_ctx.cct; | |
1556 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1557 | ||
1558 | if (!m_wake_up_enabled) { | |
1559 | // wake_up is disabled during shutdown after flushing completes | |
1560 | ldout(m_image_ctx.cct, 6) << "deferred processing disabled" << dendl; | |
1561 | return; | |
1562 | } | |
1563 | ||
1564 | if (m_wake_up_requested && m_wake_up_scheduled) { | |
1565 | return; | |
1566 | } | |
1567 | ||
1568 | ldout(cct, 20) << dendl; | |
1569 | ||
1570 | /* Wake-up can be requested while it's already scheduled */ | |
1571 | m_wake_up_requested = true; | |
1572 | ||
1573 | /* Wake-up cannot be scheduled if it's already scheduled */ | |
1574 | if (m_wake_up_scheduled) { | |
1575 | return; | |
1576 | } | |
1577 | m_wake_up_scheduled = true; | |
1578 | m_async_process_work++; | |
1579 | m_async_op_tracker.start_op(); | |
1580 | m_work_queue.queue(new LambdaContext( | |
1581 | [this](int r) { | |
1582 | process_work(); | |
1583 | m_async_op_tracker.finish_op(); | |
1584 | m_async_process_work--; | |
1585 | }), 0); | |
1586 | } | |
1587 | ||
1588 | template <typename I> | |
1589 | bool AbstractWriteLog<I>::can_flush_entry(std::shared_ptr<GenericLogEntry> log_entry) { | |
1590 | CephContext *cct = m_image_ctx.cct; | |
1591 | ||
1592 | ldout(cct, 20) << "" << dendl; | |
1593 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1594 | ||
1595 | if (m_invalidating) { | |
1596 | return true; | |
1597 | } | |
1598 | ||
1599 | /* For OWB we can flush entries with the same sync gen number (write between | |
1600 | * aio_flush() calls) concurrently. Here we'll consider an entry flushable if | |
1601 | * its sync gen number is <= the lowest sync gen number carried by all the | |
1602 | * entries currently flushing. | |
1603 | * | |
1604 | * If the entry considered here bears a sync gen number lower than a | |
1605 | * previously flushed entry, the application had to have submitted the write | |
1606 | * bearing the higher gen number before the write with the lower gen number | |
1607 | * completed. So, flushing these concurrently is OK. | |
1608 | * | |
1609 | * If the entry considered here bears a sync gen number higher than a | |
1610 | * currently flushing entry, the write with the lower gen number may have | |
1611 | * completed to the application before the write with the higher sync gen | |
1612 | * number was submitted, and the application may rely on that completion | |
1613 | * order for volume consistency. In this case the entry will not be | |
1614 | * considered flushable until all the entries bearing lower sync gen numbers | |
1615 | * finish flushing. | |
1616 | */ | |
1617 | ||
1618 | if (m_flush_ops_in_flight && | |
1619 | (log_entry->ram_entry.sync_gen_number > m_lowest_flushing_sync_gen)) { | |
1620 | return false; | |
1621 | } | |
1622 | ||
1623 | return (log_entry->can_writeback() && | |
1624 | (m_flush_ops_in_flight <= IN_FLIGHT_FLUSH_WRITE_LIMIT) && | |
1625 | (m_flush_bytes_in_flight <= IN_FLIGHT_FLUSH_BYTES_LIMIT)); | |
1626 | } | |
1627 | ||
1628 | template <typename I> | |
1629 | Context* AbstractWriteLog<I>::construct_flush_entry(std::shared_ptr<GenericLogEntry> log_entry, | |
1630 | bool invalidating) { | |
1631 | CephContext *cct = m_image_ctx.cct; | |
1632 | ||
1633 | ldout(cct, 20) << "" << dendl; | |
1634 | ceph_assert(m_entry_reader_lock.is_locked()); | |
1635 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1636 | if (!m_flush_ops_in_flight || | |
1637 | (log_entry->ram_entry.sync_gen_number < m_lowest_flushing_sync_gen)) { | |
1638 | m_lowest_flushing_sync_gen = log_entry->ram_entry.sync_gen_number; | |
1639 | } | |
1640 | m_flush_ops_in_flight += 1; | |
a4b75251 | 1641 | m_flush_ops_will_send += 1; |
f67539c2 TL |
1642 | /* For write same this is the bytes affected by the flush op, not the bytes transferred */ |
1643 | m_flush_bytes_in_flight += log_entry->ram_entry.write_bytes; | |
1644 | ||
1645 | /* Flush write completion action */ | |
a4b75251 | 1646 | utime_t writeback_start_time = ceph_clock_now(); |
f67539c2 | 1647 | Context *ctx = new LambdaContext( |
a4b75251 TL |
1648 | [this, log_entry, writeback_start_time, invalidating](int r) { |
1649 | utime_t writeback_comp_time = ceph_clock_now(); | |
1650 | m_perfcounter->tinc(l_librbd_pwl_writeback_latency, | |
1651 | writeback_comp_time - writeback_start_time); | |
f67539c2 TL |
1652 | { |
1653 | std::lock_guard locker(m_lock); | |
1654 | if (r < 0) { | |
1655 | lderr(m_image_ctx.cct) << "failed to flush log entry" | |
1656 | << cpp_strerror(r) << dendl; | |
1657 | m_dirty_log_entries.push_front(log_entry); | |
1658 | } else { | |
1659 | ceph_assert(m_bytes_dirty >= log_entry->bytes_dirty()); | |
1660 | log_entry->set_flushed(true); | |
1661 | m_bytes_dirty -= log_entry->bytes_dirty(); | |
1662 | sync_point_writer_flushed(log_entry->get_sync_point_entry()); | |
1663 | ldout(m_image_ctx.cct, 20) << "flushed: " << log_entry | |
1664 | << " invalidating=" << invalidating | |
1665 | << dendl; | |
1666 | } | |
1667 | m_flush_ops_in_flight -= 1; | |
1668 | m_flush_bytes_in_flight -= log_entry->ram_entry.write_bytes; | |
1669 | wake_up(); | |
1670 | } | |
1671 | }); | |
1672 | /* Flush through lower cache before completing */ | |
1673 | ctx = new LambdaContext( | |
1674 | [this, ctx](int r) { | |
1675 | if (r < 0) { | |
1676 | lderr(m_image_ctx.cct) << "failed to flush log entry" | |
1677 | << cpp_strerror(r) << dendl; | |
1678 | ctx->complete(r); | |
1679 | } else { | |
1680 | m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, ctx); | |
1681 | } | |
1682 | }); | |
1683 | return ctx; | |
1684 | } | |
1685 | ||
1686 | template <typename I> | |
1687 | void AbstractWriteLog<I>::process_writeback_dirty_entries() { | |
1688 | CephContext *cct = m_image_ctx.cct; | |
1689 | bool all_clean = false; | |
1690 | int flushed = 0; | |
a4b75251 | 1691 | bool has_write_entry = false; |
f67539c2 TL |
1692 | |
1693 | ldout(cct, 20) << "Look for dirty entries" << dendl; | |
1694 | { | |
1695 | DeferredContexts post_unlock; | |
a4b75251 TL |
1696 | GenericLogEntries entries_to_flush; |
1697 | ||
f67539c2 | 1698 | std::shared_lock entry_reader_locker(m_entry_reader_lock); |
a4b75251 | 1699 | std::lock_guard locker(m_lock); |
f67539c2 | 1700 | while (flushed < IN_FLIGHT_FLUSH_WRITE_LIMIT) { |
f67539c2 TL |
1701 | if (m_shutting_down) { |
1702 | ldout(cct, 5) << "Flush during shutdown supressed" << dendl; | |
1703 | /* Do flush complete only when all flush ops are finished */ | |
1704 | all_clean = !m_flush_ops_in_flight; | |
1705 | break; | |
1706 | } | |
1707 | if (m_dirty_log_entries.empty()) { | |
1708 | ldout(cct, 20) << "Nothing new to flush" << dendl; | |
1709 | /* Do flush complete only when all flush ops are finished */ | |
1710 | all_clean = !m_flush_ops_in_flight; | |
1711 | break; | |
1712 | } | |
a4b75251 TL |
1713 | |
1714 | if (m_flush_ops_will_send) { | |
1715 | ldout(cct, 20) << "Previous flush-ops is still not sent" << dendl; | |
1716 | break; | |
1717 | } | |
f67539c2 TL |
1718 | auto candidate = m_dirty_log_entries.front(); |
1719 | bool flushable = can_flush_entry(candidate); | |
1720 | if (flushable) { | |
a4b75251 | 1721 | entries_to_flush.push_back(candidate); |
f67539c2 | 1722 | flushed++; |
a4b75251 TL |
1723 | if (!has_write_entry) |
1724 | has_write_entry = candidate->is_write_entry(); | |
f67539c2 TL |
1725 | m_dirty_log_entries.pop_front(); |
1726 | } else { | |
1727 | ldout(cct, 20) << "Next dirty entry isn't flushable yet" << dendl; | |
1728 | break; | |
1729 | } | |
1730 | } | |
a4b75251 TL |
1731 | |
1732 | construct_flush_entries(entries_to_flush, post_unlock, has_write_entry); | |
f67539c2 TL |
1733 | } |
1734 | ||
1735 | if (all_clean) { | |
1736 | /* All flushing complete, drain outside lock */ | |
1737 | Contexts flush_contexts; | |
1738 | { | |
1739 | std::lock_guard locker(m_lock); | |
1740 | flush_contexts.swap(m_flush_complete_contexts); | |
1741 | } | |
1742 | finish_contexts(m_image_ctx.cct, flush_contexts, 0); | |
1743 | } | |
1744 | } | |
1745 | ||
1746 | /* Returns true if the specified SyncPointLogEntry is considered flushed, and | |
1747 | * the log will be updated to reflect this. */ | |
1748 | template <typename I> | |
1749 | bool AbstractWriteLog<I>::handle_flushed_sync_point(std::shared_ptr<SyncPointLogEntry> log_entry) | |
1750 | { | |
1751 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1752 | ceph_assert(log_entry); | |
1753 | ||
1754 | if ((log_entry->writes_flushed == log_entry->writes) && | |
1755 | log_entry->completed && log_entry->prior_sync_point_flushed && | |
1756 | log_entry->next_sync_point_entry) { | |
1757 | ldout(m_image_ctx.cct, 20) << "All writes flushed up to sync point=" | |
1758 | << *log_entry << dendl; | |
1759 | log_entry->next_sync_point_entry->prior_sync_point_flushed = true; | |
1760 | /* Don't move the flushed sync gen num backwards. */ | |
1761 | if (m_flushed_sync_gen < log_entry->ram_entry.sync_gen_number) { | |
1762 | m_flushed_sync_gen = log_entry->ram_entry.sync_gen_number; | |
1763 | } | |
1764 | m_async_op_tracker.start_op(); | |
1765 | m_work_queue.queue(new LambdaContext( | |
a4b75251 | 1766 | [this, next = std::move(log_entry->next_sync_point_entry)](int r) { |
f67539c2 TL |
1767 | bool handled_by_next; |
1768 | { | |
1769 | std::lock_guard locker(m_lock); | |
a4b75251 | 1770 | handled_by_next = handle_flushed_sync_point(std::move(next)); |
f67539c2 TL |
1771 | } |
1772 | if (!handled_by_next) { | |
1773 | persist_last_flushed_sync_gen(); | |
1774 | } | |
1775 | m_async_op_tracker.finish_op(); | |
1776 | })); | |
1777 | return true; | |
1778 | } | |
1779 | return false; | |
1780 | } | |
1781 | ||
1782 | template <typename I> | |
1783 | void AbstractWriteLog<I>::sync_point_writer_flushed(std::shared_ptr<SyncPointLogEntry> log_entry) | |
1784 | { | |
1785 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1786 | ceph_assert(log_entry); | |
1787 | log_entry->writes_flushed++; | |
1788 | ||
1789 | /* If this entry might be completely flushed, look closer */ | |
1790 | if ((log_entry->writes_flushed == log_entry->writes) && log_entry->completed) { | |
1791 | ldout(m_image_ctx.cct, 15) << "All writes flushed for sync point=" | |
1792 | << *log_entry << dendl; | |
1793 | handle_flushed_sync_point(log_entry); | |
1794 | } | |
1795 | } | |
1796 | ||
1797 | /* Make a new sync point and flush the previous during initialization, when there may or may | |
1798 | * not be a previous sync point */ | |
1799 | template <typename I> | |
1800 | void AbstractWriteLog<I>::init_flush_new_sync_point(DeferredContexts &later) { | |
1801 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1802 | ceph_assert(!m_initialized); /* Don't use this after init */ | |
1803 | ||
1804 | if (!m_current_sync_point) { | |
1805 | /* First sync point since start */ | |
1806 | new_sync_point(later); | |
1807 | } else { | |
1808 | flush_new_sync_point(nullptr, later); | |
1809 | } | |
1810 | } | |
1811 | ||
1812 | /** | |
1813 | * Begin a new sync point | |
1814 | */ | |
1815 | template <typename I> | |
1816 | void AbstractWriteLog<I>::new_sync_point(DeferredContexts &later) { | |
1817 | CephContext *cct = m_image_ctx.cct; | |
1818 | std::shared_ptr<SyncPoint> old_sync_point = m_current_sync_point; | |
1819 | std::shared_ptr<SyncPoint> new_sync_point; | |
1820 | ldout(cct, 20) << dendl; | |
1821 | ||
1822 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1823 | ||
1824 | /* The first time this is called, if this is a newly created log, | |
1825 | * this makes the first sync gen number we'll use 1. On the first | |
1826 | * call for a re-opened log m_current_sync_gen will be the highest | |
1827 | * gen number from all the sync point entries found in the re-opened | |
1828 | * log, and this advances to the next sync gen number. */ | |
1829 | ++m_current_sync_gen; | |
1830 | ||
1831 | new_sync_point = std::make_shared<SyncPoint>(m_current_sync_gen, cct); | |
1832 | m_current_sync_point = new_sync_point; | |
1833 | ||
1834 | /* If this log has been re-opened, old_sync_point will initially be | |
1835 | * nullptr, but m_current_sync_gen may not be zero. */ | |
1836 | if (old_sync_point) { | |
1837 | new_sync_point->setup_earlier_sync_point(old_sync_point, m_last_op_sequence_num); | |
1838 | m_perfcounter->hinc(l_librbd_pwl_syncpoint_hist, | |
1839 | old_sync_point->log_entry->writes, | |
1840 | old_sync_point->log_entry->bytes); | |
1841 | /* This sync point will acquire no more sub-ops. Activation needs | |
1842 | * to acquire m_lock, so defer to later*/ | |
1843 | later.add(new LambdaContext( | |
1844 | [this, old_sync_point](int r) { | |
1845 | old_sync_point->prior_persisted_gather_activate(); | |
1846 | })); | |
1847 | } | |
1848 | ||
1849 | new_sync_point->prior_persisted_gather_set_finisher(); | |
1850 | ||
1851 | if (old_sync_point) { | |
1852 | ldout(cct,6) << "new sync point = [" << *m_current_sync_point | |
1853 | << "], prior = [" << *old_sync_point << "]" << dendl; | |
1854 | } else { | |
1855 | ldout(cct,6) << "first sync point = [" << *m_current_sync_point | |
1856 | << "]" << dendl; | |
1857 | } | |
1858 | } | |
1859 | ||
1860 | template <typename I> | |
1861 | void AbstractWriteLog<I>::flush_new_sync_point(C_FlushRequestT *flush_req, | |
1862 | DeferredContexts &later) { | |
1863 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1864 | ||
1865 | if (!flush_req) { | |
1866 | m_async_null_flush_finish++; | |
1867 | m_async_op_tracker.start_op(); | |
1868 | Context *flush_ctx = new LambdaContext([this](int r) { | |
1869 | m_async_null_flush_finish--; | |
1870 | m_async_op_tracker.finish_op(); | |
1871 | }); | |
1872 | flush_req = make_flush_req(flush_ctx); | |
1873 | flush_req->internal = true; | |
1874 | } | |
1875 | ||
1876 | /* Add a new sync point. */ | |
1877 | new_sync_point(later); | |
1878 | std::shared_ptr<SyncPoint> to_append = m_current_sync_point->earlier_sync_point; | |
1879 | ceph_assert(to_append); | |
1880 | ||
1881 | /* This flush request will append/persist the (now) previous sync point */ | |
1882 | flush_req->to_append = to_append; | |
1883 | ||
1884 | /* When the m_sync_point_persist Gather completes this sync point can be | |
1885 | * appended. The only sub for this Gather is the finisher Context for | |
1886 | * m_prior_log_entries_persisted, which records the result of the Gather in | |
1887 | * the sync point, and completes. TODO: Do we still need both of these | |
1888 | * Gathers?*/ | |
1889 | Context * ctx = new LambdaContext([this, flush_req](int r) { | |
1890 | ldout(m_image_ctx.cct, 20) << "Flush req=" << flush_req | |
1891 | << " sync point =" << flush_req->to_append | |
1892 | << ". Ready to persist." << dendl; | |
1893 | alloc_and_dispatch_io_req(flush_req); | |
1894 | }); | |
1895 | to_append->persist_gather_set_finisher(ctx); | |
1896 | ||
1897 | /* The m_sync_point_persist Gather has all the subs it will ever have, and | |
1898 | * now has its finisher. If the sub is already complete, activation will | |
1899 | * complete the Gather. The finisher will acquire m_lock, so we'll activate | |
1900 | * this when we release m_lock.*/ | |
1901 | later.add(new LambdaContext([this, to_append](int r) { | |
1902 | to_append->persist_gather_activate(); | |
1903 | })); | |
1904 | ||
1905 | /* The flush request completes when the sync point persists */ | |
1906 | to_append->add_in_on_persisted_ctxs(flush_req); | |
1907 | } | |
1908 | ||
1909 | template <typename I> | |
1910 | void AbstractWriteLog<I>::flush_new_sync_point_if_needed(C_FlushRequestT *flush_req, | |
1911 | DeferredContexts &later) { | |
1912 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1913 | ||
1914 | /* If there have been writes since the last sync point ... */ | |
1915 | if (m_current_sync_point->log_entry->writes) { | |
1916 | flush_new_sync_point(flush_req, later); | |
1917 | } else { | |
1918 | /* There have been no writes to the current sync point. */ | |
1919 | if (m_current_sync_point->earlier_sync_point) { | |
1920 | /* If previous sync point hasn't completed, complete this flush | |
1921 | * with the earlier sync point. No alloc or dispatch needed. */ | |
1922 | m_current_sync_point->earlier_sync_point->on_sync_point_persisted.push_back(flush_req); | |
1923 | } else { | |
1924 | /* The previous sync point has already completed and been | |
1925 | * appended. The current sync point has no writes, so this flush | |
1926 | * has nothing to wait for. This flush completes now. */ | |
1927 | later.add(flush_req); | |
1928 | } | |
1929 | } | |
1930 | } | |
1931 | ||
1932 | /* | |
1933 | * RWL internal flush - will actually flush the RWL. | |
1934 | * | |
1935 | * User flushes should arrive at aio_flush(), and only flush prior | |
1936 | * writes to all log replicas. | |
1937 | * | |
1938 | * Librbd internal flushes will arrive at flush(invalidate=false, | |
1939 | * discard=false), and traverse the block guard to ensure in-flight writes are | |
1940 | * flushed. | |
1941 | */ | |
1942 | template <typename I> | |
1943 | void AbstractWriteLog<I>::flush_dirty_entries(Context *on_finish) { | |
1944 | CephContext *cct = m_image_ctx.cct; | |
1945 | bool all_clean; | |
1946 | bool flushing; | |
1947 | bool stop_flushing; | |
1948 | ||
1949 | { | |
1950 | std::lock_guard locker(m_lock); | |
1951 | flushing = (0 != m_flush_ops_in_flight); | |
1952 | all_clean = m_dirty_log_entries.empty(); | |
1953 | stop_flushing = (m_shutting_down); | |
1954 | } | |
1955 | ||
1956 | if (!flushing && (all_clean || stop_flushing)) { | |
1957 | /* Complete without holding m_lock */ | |
1958 | if (all_clean) { | |
1959 | ldout(cct, 20) << "no dirty entries" << dendl; | |
1960 | } else { | |
1961 | ldout(cct, 5) << "flush during shutdown suppressed" << dendl; | |
1962 | } | |
1963 | on_finish->complete(0); | |
1964 | } else { | |
1965 | if (all_clean) { | |
1966 | ldout(cct, 5) << "flush ops still in progress" << dendl; | |
1967 | } else { | |
1968 | ldout(cct, 20) << "dirty entries remain" << dendl; | |
1969 | } | |
1970 | std::lock_guard locker(m_lock); | |
1971 | /* on_finish can't be completed yet */ | |
1972 | m_flush_complete_contexts.push_back(new LambdaContext( | |
1973 | [this, on_finish](int r) { | |
1974 | flush_dirty_entries(on_finish); | |
1975 | })); | |
1976 | wake_up(); | |
1977 | } | |
1978 | } | |
1979 | ||
1980 | template <typename I> | |
1981 | void AbstractWriteLog<I>::internal_flush(bool invalidate, Context *on_finish) { | |
1982 | ldout(m_image_ctx.cct, 20) << "invalidate=" << invalidate << dendl; | |
1983 | ||
1984 | if (m_perfcounter) { | |
1985 | if (invalidate) { | |
1986 | m_perfcounter->inc(l_librbd_pwl_invalidate_cache, 1); | |
1987 | } else { | |
a4b75251 | 1988 | m_perfcounter->inc(l_librbd_pwl_internal_flush, 1); |
f67539c2 TL |
1989 | } |
1990 | } | |
1991 | ||
1992 | /* May be called even if initialization fails */ | |
1993 | if (!m_initialized) { | |
1994 | ldout(m_image_ctx.cct, 05) << "never initialized" << dendl; | |
1995 | /* Deadlock if completed here */ | |
1996 | m_image_ctx.op_work_queue->queue(on_finish, 0); | |
1997 | return; | |
1998 | } | |
1999 | ||
2000 | /* Flush/invalidate must pass through block guard to ensure all layers of | |
2001 | * cache are consistently flush/invalidated. This ensures no in-flight write leaves | |
2002 | * some layers with valid regions, which may later produce inconsistent read | |
2003 | * results. */ | |
2004 | GuardedRequestFunctionContext *guarded_ctx = | |
2005 | new GuardedRequestFunctionContext( | |
2006 | [this, on_finish, invalidate](GuardedRequestFunctionContext &guard_ctx) { | |
2007 | DeferredContexts on_exit; | |
2008 | ldout(m_image_ctx.cct, 20) << "cell=" << guard_ctx.cell << dendl; | |
2009 | ceph_assert(guard_ctx.cell); | |
2010 | ||
2011 | Context *ctx = new LambdaContext( | |
2012 | [this, cell=guard_ctx.cell, invalidate, on_finish](int r) { | |
2013 | std::lock_guard locker(m_lock); | |
2014 | m_invalidating = false; | |
2015 | ldout(m_image_ctx.cct, 6) << "Done flush/invalidating (invalidate=" | |
2016 | << invalidate << ")" << dendl; | |
2017 | if (m_log_entries.size()) { | |
2018 | ldout(m_image_ctx.cct, 1) << "m_log_entries.size()=" | |
2019 | << m_log_entries.size() << ", " | |
2020 | << "front()=" << *m_log_entries.front() | |
2021 | << dendl; | |
2022 | } | |
2023 | if (invalidate) { | |
2024 | ceph_assert(m_log_entries.size() == 0); | |
2025 | } | |
2026 | ceph_assert(m_dirty_log_entries.size() == 0); | |
2027 | m_image_ctx.op_work_queue->queue(on_finish, r); | |
2028 | release_guarded_request(cell); | |
2029 | }); | |
2030 | ctx = new LambdaContext( | |
2031 | [this, ctx, invalidate](int r) { | |
2032 | Context *next_ctx = ctx; | |
2033 | ldout(m_image_ctx.cct, 6) << "flush_dirty_entries finished" << dendl; | |
2034 | if (r < 0) { | |
2035 | /* Override on_finish status with this error */ | |
2036 | next_ctx = new LambdaContext([r, ctx](int _r) { | |
2037 | ctx->complete(r); | |
2038 | }); | |
2039 | } | |
2040 | if (invalidate) { | |
2041 | { | |
2042 | std::lock_guard locker(m_lock); | |
2043 | ceph_assert(m_dirty_log_entries.size() == 0); | |
2044 | ceph_assert(!m_invalidating); | |
2045 | ldout(m_image_ctx.cct, 6) << "Invalidating" << dendl; | |
2046 | m_invalidating = true; | |
2047 | } | |
2048 | /* Discards all RWL entries */ | |
2049 | while (retire_entries(MAX_ALLOC_PER_TRANSACTION)) { } | |
2050 | next_ctx->complete(0); | |
2051 | } else { | |
2052 | { | |
2053 | std::lock_guard locker(m_lock); | |
2054 | ceph_assert(m_dirty_log_entries.size() == 0); | |
2055 | ceph_assert(!m_invalidating); | |
2056 | } | |
2057 | m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, next_ctx); | |
2058 | } | |
2059 | }); | |
2060 | ctx = new LambdaContext( | |
2061 | [this, ctx](int r) { | |
2062 | flush_dirty_entries(ctx); | |
2063 | }); | |
2064 | std::lock_guard locker(m_lock); | |
2065 | /* Even if we're throwing everything away, but we want the last entry to | |
2066 | * be a sync point so we can cleanly resume. | |
2067 | * | |
2068 | * Also, the blockguard only guarantees the replication of this op | |
2069 | * can't overlap with prior ops. It doesn't guarantee those are all | |
2070 | * completed and eligible for flush & retire, which we require here. | |
2071 | */ | |
2072 | auto flush_req = make_flush_req(ctx); | |
2073 | flush_new_sync_point_if_needed(flush_req, on_exit); | |
2074 | }); | |
2075 | detain_guarded_request(nullptr, guarded_ctx, true); | |
2076 | } | |
2077 | ||
2078 | template <typename I> | |
2079 | void AbstractWriteLog<I>::add_into_log_map(GenericWriteLogEntries &log_entries, | |
2080 | C_BlockIORequestT *req) { | |
2081 | req->copy_cache(); | |
2082 | m_blocks_to_log_entries.add_log_entries(log_entries); | |
2083 | } | |
2084 | ||
2085 | template <typename I> | |
2086 | bool AbstractWriteLog<I>::can_retire_entry(std::shared_ptr<GenericLogEntry> log_entry) { | |
2087 | CephContext *cct = m_image_ctx.cct; | |
2088 | ||
2089 | ldout(cct, 20) << dendl; | |
2090 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
2091 | return log_entry->can_retire(); | |
2092 | } | |
2093 | ||
a4b75251 TL |
2094 | template <typename I> |
2095 | void AbstractWriteLog<I>::check_image_cache_state_clean() { | |
2096 | ceph_assert(m_deferred_ios.empty()); | |
2097 | ceph_assert(m_ops_to_append.empty());; | |
2098 | ceph_assert(m_async_flush_ops == 0); | |
2099 | ceph_assert(m_async_append_ops == 0); | |
2100 | ceph_assert(m_dirty_log_entries.empty()); | |
2101 | ceph_assert(m_ops_to_flush.empty()); | |
2102 | ceph_assert(m_flush_ops_in_flight == 0); | |
2103 | ceph_assert(m_flush_bytes_in_flight == 0); | |
2104 | ceph_assert(m_bytes_dirty == 0); | |
2105 | ceph_assert(m_work_queue.empty()); | |
2106 | } | |
2107 | ||
f67539c2 TL |
2108 | } // namespace pwl |
2109 | } // namespace cache | |
2110 | } // namespace librbd | |
2111 | ||
2112 | template class librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx>; |