]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "AbstractWriteLog.h" | |
5 | #include "include/buffer.h" | |
6 | #include "include/Context.h" | |
7 | #include "include/ceph_assert.h" | |
8 | #include "common/deleter.h" | |
9 | #include "common/dout.h" | |
10 | #include "common/environment.h" | |
11 | #include "common/errno.h" | |
12 | #include "common/hostname.h" | |
13 | #include "common/WorkQueue.h" | |
14 | #include "common/Timer.h" | |
15 | #include "common/perf_counters.h" | |
16 | #include "librbd/ImageCtx.h" | |
17 | #include "librbd/asio/ContextWQ.h" | |
18 | #include "librbd/cache/pwl/ImageCacheState.h" | |
19 | #include "librbd/cache/pwl/LogEntry.h" | |
20 | #include "librbd/plugin/Api.h" | |
21 | #include <map> | |
22 | #include <vector> | |
23 | ||
24 | #undef dout_subsys | |
25 | #define dout_subsys ceph_subsys_rbd_pwl | |
26 | #undef dout_prefix | |
27 | #define dout_prefix *_dout << "librbd::cache::pwl::AbstractWriteLog: " << this \ | |
28 | << " " << __func__ << ": " | |
29 | ||
30 | namespace librbd { | |
31 | namespace cache { | |
32 | namespace pwl { | |
33 | ||
20effc67 | 34 | using namespace std; |
f67539c2 TL |
35 | using namespace librbd::cache::pwl; |
36 | ||
37 | typedef AbstractWriteLog<ImageCtx>::Extent Extent; | |
38 | typedef AbstractWriteLog<ImageCtx>::Extents Extents; | |
39 | ||
40 | template <typename I> | |
41 | AbstractWriteLog<I>::AbstractWriteLog( | |
42 | I &image_ctx, librbd::cache::pwl::ImageCacheState<I>* cache_state, | |
43 | Builder<This> *builder, cache::ImageWritebackInterface& image_writeback, | |
44 | plugin::Api<I>& plugin_api) | |
45 | : m_builder(builder), | |
46 | m_write_log_guard(image_ctx.cct), | |
20effc67 TL |
47 | m_flush_guard(image_ctx.cct), |
48 | m_flush_guard_lock(ceph::make_mutex(pwl::unique_lock_name( | |
49 | "librbd::cache::pwl::AbstractWriteLog::m_flush_guard_lock", this))), | |
f67539c2 TL |
50 | m_deferred_dispatch_lock(ceph::make_mutex(pwl::unique_lock_name( |
51 | "librbd::cache::pwl::AbstractWriteLog::m_deferred_dispatch_lock", this))), | |
52 | m_blockguard_lock(ceph::make_mutex(pwl::unique_lock_name( | |
53 | "librbd::cache::pwl::AbstractWriteLog::m_blockguard_lock", this))), | |
54 | m_thread_pool( | |
55 | image_ctx.cct, "librbd::cache::pwl::AbstractWriteLog::thread_pool", | |
56 | "tp_pwl", 4, ""), | |
57 | m_cache_state(cache_state), | |
58 | m_image_ctx(image_ctx), | |
a4b75251 | 59 | m_log_pool_size(DEFAULT_POOL_SIZE), |
f67539c2 TL |
60 | m_image_writeback(image_writeback), |
61 | m_plugin_api(plugin_api), | |
62 | m_log_retire_lock(ceph::make_mutex(pwl::unique_lock_name( | |
63 | "librbd::cache::pwl::AbstractWriteLog::m_log_retire_lock", this))), | |
64 | m_entry_reader_lock("librbd::cache::pwl::AbstractWriteLog::m_entry_reader_lock"), | |
65 | m_log_append_lock(ceph::make_mutex(pwl::unique_lock_name( | |
66 | "librbd::cache::pwl::AbstractWriteLog::m_log_append_lock", this))), | |
67 | m_lock(ceph::make_mutex(pwl::unique_lock_name( | |
68 | "librbd::cache::pwl::AbstractWriteLog::m_lock", this))), | |
69 | m_blocks_to_log_entries(image_ctx.cct), | |
70 | m_work_queue("librbd::cache::pwl::ReplicatedWriteLog::work_queue", | |
71 | ceph::make_timespan( | |
72 | image_ctx.config.template get_val<uint64_t>( | |
73 | "rbd_op_thread_timeout")), | |
74 | &m_thread_pool) | |
75 | { | |
76 | CephContext *cct = m_image_ctx.cct; | |
77 | m_plugin_api.get_image_timer_instance(cct, &m_timer, &m_timer_lock); | |
78 | } | |
79 | ||
80 | template <typename I> | |
81 | AbstractWriteLog<I>::~AbstractWriteLog() { | |
82 | ldout(m_image_ctx.cct, 15) << "enter" << dendl; | |
83 | { | |
84 | std::lock_guard timer_locker(*m_timer_lock); | |
85 | std::lock_guard locker(m_lock); | |
86 | m_timer->cancel_event(m_timer_ctx); | |
87 | m_thread_pool.stop(); | |
88 | ceph_assert(m_deferred_ios.size() == 0); | |
89 | ceph_assert(m_ops_to_flush.size() == 0); | |
90 | ceph_assert(m_ops_to_append.size() == 0); | |
91 | ceph_assert(m_flush_ops_in_flight == 0); | |
92 | ||
93 | delete m_cache_state; | |
94 | m_cache_state = nullptr; | |
95 | } | |
96 | ldout(m_image_ctx.cct, 15) << "exit" << dendl; | |
97 | } | |
98 | ||
99 | template <typename I> | |
100 | void AbstractWriteLog<I>::perf_start(std::string name) { | |
101 | PerfCountersBuilder plb(m_image_ctx.cct, name, l_librbd_pwl_first, | |
102 | l_librbd_pwl_last); | |
103 | ||
104 | // Latency axis configuration for op histograms, values are in nanoseconds | |
105 | PerfHistogramCommon::axis_config_d op_hist_x_axis_config{ | |
106 | "Latency (nsec)", | |
107 | PerfHistogramCommon::SCALE_LOG2, ///< Latency in logarithmic scale | |
108 | 0, ///< Start at 0 | |
109 | 5000, ///< Quantization unit is 5usec | |
110 | 16, ///< Ranges into the mS | |
111 | }; | |
112 | ||
113 | // Syncpoint logentry number x-axis configuration for op histograms | |
114 | PerfHistogramCommon::axis_config_d sp_logentry_number_config{ | |
115 | "logentry number", | |
116 | PerfHistogramCommon::SCALE_LINEAR, // log entry number in linear scale | |
117 | 0, // Start at 0 | |
118 | 1, // Quantization unit is 1 | |
119 | 260, // Up to 260 > (MAX_WRITES_PER_SYNC_POINT) | |
120 | }; | |
121 | ||
122 | // Syncpoint bytes number y-axis configuration for op histogram | |
123 | PerfHistogramCommon::axis_config_d sp_bytes_number_config{ | |
124 | "Number of SyncPoint", | |
125 | PerfHistogramCommon::SCALE_LOG2, // Request size in logarithmic scale | |
126 | 0, // Start at 0 | |
127 | 512, // Quantization unit is 512 | |
128 | 17, // Writes up to 8M >= MAX_BYTES_PER_SYNC_POINT | |
129 | }; | |
130 | ||
131 | // Op size axis configuration for op histogram y axis, values are in bytes | |
132 | PerfHistogramCommon::axis_config_d op_hist_y_axis_config{ | |
133 | "Request size (bytes)", | |
134 | PerfHistogramCommon::SCALE_LOG2, ///< Request size in logarithmic scale | |
135 | 0, ///< Start at 0 | |
136 | 512, ///< Quantization unit is 512 bytes | |
137 | 16, ///< Writes up to >32k | |
138 | }; | |
139 | ||
140 | // Num items configuration for op histogram y axis, values are in items | |
141 | PerfHistogramCommon::axis_config_d op_hist_y_axis_count_config{ | |
142 | "Number of items", | |
143 | PerfHistogramCommon::SCALE_LINEAR, ///< Request size in linear scale | |
144 | 0, ///< Start at 0 | |
145 | 1, ///< Quantization unit is 1 | |
146 | 32, ///< Writes up to >32k | |
147 | }; | |
148 | ||
149 | plb.add_u64_counter(l_librbd_pwl_rd_req, "rd", "Reads"); | |
150 | plb.add_u64_counter(l_librbd_pwl_rd_bytes, "rd_bytes", "Data size in reads"); | |
151 | plb.add_time_avg(l_librbd_pwl_rd_latency, "rd_latency", "Latency of reads"); | |
152 | ||
153 | plb.add_u64_counter(l_librbd_pwl_rd_hit_req, "hit_rd", "Reads completely hitting RWL"); | |
154 | plb.add_u64_counter(l_librbd_pwl_rd_hit_bytes, "rd_hit_bytes", "Bytes read from RWL"); | |
155 | plb.add_time_avg(l_librbd_pwl_rd_hit_latency, "hit_rd_latency", "Latency of read hits"); | |
156 | ||
157 | plb.add_u64_counter(l_librbd_pwl_rd_part_hit_req, "part_hit_rd", "reads partially hitting RWL"); | |
158 | ||
159 | plb.add_u64_counter_histogram( | |
160 | l_librbd_pwl_syncpoint_hist, "syncpoint_logentry_bytes_histogram", | |
161 | sp_logentry_number_config, sp_bytes_number_config, | |
162 | "Histogram of syncpoint's logentry numbers vs bytes number"); | |
163 | ||
164 | plb.add_u64_counter(l_librbd_pwl_wr_req, "wr", "Writes"); | |
a4b75251 | 165 | plb.add_u64_counter(l_librbd_pwl_wr_bytes, "wr_bytes", "Data size in writes"); |
f67539c2 TL |
166 | plb.add_u64_counter(l_librbd_pwl_wr_req_def, "wr_def", "Writes deferred for resources"); |
167 | plb.add_u64_counter(l_librbd_pwl_wr_req_def_lanes, "wr_def_lanes", "Writes deferred for lanes"); | |
168 | plb.add_u64_counter(l_librbd_pwl_wr_req_def_log, "wr_def_log", "Writes deferred for log entries"); | |
169 | plb.add_u64_counter(l_librbd_pwl_wr_req_def_buf, "wr_def_buf", "Writes deferred for buffers"); | |
170 | plb.add_u64_counter(l_librbd_pwl_wr_req_overlap, "wr_overlap", "Writes overlapping with prior in-progress writes"); | |
171 | plb.add_u64_counter(l_librbd_pwl_wr_req_queued, "wr_q_barrier", "Writes queued for prior barriers (aio_flush)"); | |
f67539c2 TL |
172 | |
173 | plb.add_u64_counter(l_librbd_pwl_log_ops, "log_ops", "Log appends"); | |
174 | plb.add_u64_avg(l_librbd_pwl_log_op_bytes, "log_op_bytes", "Average log append bytes"); | |
175 | ||
176 | plb.add_time_avg( | |
177 | l_librbd_pwl_req_arr_to_all_t, "req_arr_to_all_t", | |
178 | "Average arrival to allocation time (time deferred for overlap)"); | |
179 | plb.add_time_avg( | |
180 | l_librbd_pwl_req_arr_to_dis_t, "req_arr_to_dis_t", | |
181 | "Average arrival to dispatch time (includes time deferred for overlaps and allocation)"); | |
182 | plb.add_time_avg( | |
183 | l_librbd_pwl_req_all_to_dis_t, "req_all_to_dis_t", | |
184 | "Average allocation to dispatch time (time deferred for log resources)"); | |
185 | plb.add_time_avg( | |
186 | l_librbd_pwl_wr_latency, "wr_latency", | |
187 | "Latency of writes (persistent completion)"); | |
188 | plb.add_u64_counter_histogram( | |
189 | l_librbd_pwl_wr_latency_hist, "wr_latency_bytes_histogram", | |
190 | op_hist_x_axis_config, op_hist_y_axis_config, | |
191 | "Histogram of write request latency (nanoseconds) vs. bytes written"); | |
192 | plb.add_time_avg( | |
193 | l_librbd_pwl_wr_caller_latency, "caller_wr_latency", | |
194 | "Latency of write completion to caller"); | |
195 | plb.add_time_avg( | |
196 | l_librbd_pwl_nowait_req_arr_to_all_t, "req_arr_to_all_nw_t", | |
197 | "Average arrival to allocation time (time deferred for overlap)"); | |
198 | plb.add_time_avg( | |
199 | l_librbd_pwl_nowait_req_arr_to_dis_t, "req_arr_to_dis_nw_t", | |
200 | "Average arrival to dispatch time (includes time deferred for overlaps and allocation)"); | |
201 | plb.add_time_avg( | |
202 | l_librbd_pwl_nowait_req_all_to_dis_t, "req_all_to_dis_nw_t", | |
203 | "Average allocation to dispatch time (time deferred for log resources)"); | |
204 | plb.add_time_avg( | |
205 | l_librbd_pwl_nowait_wr_latency, "wr_latency_nw", | |
206 | "Latency of writes (persistent completion) not deferred for free space"); | |
207 | plb.add_u64_counter_histogram( | |
208 | l_librbd_pwl_nowait_wr_latency_hist, "wr_latency_nw_bytes_histogram", | |
209 | op_hist_x_axis_config, op_hist_y_axis_config, | |
210 | "Histogram of write request latency (nanoseconds) vs. bytes written for writes not deferred for free space"); | |
211 | plb.add_time_avg( | |
212 | l_librbd_pwl_nowait_wr_caller_latency, "caller_wr_latency_nw", | |
213 | "Latency of write completion to callerfor writes not deferred for free space"); | |
214 | plb.add_time_avg(l_librbd_pwl_log_op_alloc_t, "op_alloc_t", "Average buffer pmemobj_reserve() time"); | |
215 | plb.add_u64_counter_histogram( | |
216 | l_librbd_pwl_log_op_alloc_t_hist, "op_alloc_t_bytes_histogram", | |
217 | op_hist_x_axis_config, op_hist_y_axis_config, | |
218 | "Histogram of buffer pmemobj_reserve() time (nanoseconds) vs. bytes written"); | |
219 | plb.add_time_avg(l_librbd_pwl_log_op_dis_to_buf_t, "op_dis_to_buf_t", "Average dispatch to buffer persist time"); | |
220 | plb.add_time_avg(l_librbd_pwl_log_op_dis_to_app_t, "op_dis_to_app_t", "Average dispatch to log append time"); | |
221 | plb.add_time_avg(l_librbd_pwl_log_op_dis_to_cmp_t, "op_dis_to_cmp_t", "Average dispatch to persist completion time"); | |
222 | plb.add_u64_counter_histogram( | |
223 | l_librbd_pwl_log_op_dis_to_cmp_t_hist, "op_dis_to_cmp_t_bytes_histogram", | |
224 | op_hist_x_axis_config, op_hist_y_axis_config, | |
225 | "Histogram of op dispatch to persist complete time (nanoseconds) vs. bytes written"); | |
226 | ||
227 | plb.add_time_avg( | |
228 | l_librbd_pwl_log_op_buf_to_app_t, "op_buf_to_app_t", | |
229 | "Average buffer persist to log append time (write data persist/replicate + wait for append time)"); | |
230 | plb.add_time_avg( | |
231 | l_librbd_pwl_log_op_buf_to_bufc_t, "op_buf_to_bufc_t", | |
232 | "Average buffer persist time (write data persist/replicate time)"); | |
233 | plb.add_u64_counter_histogram( | |
234 | l_librbd_pwl_log_op_buf_to_bufc_t_hist, "op_buf_to_bufc_t_bytes_histogram", | |
235 | op_hist_x_axis_config, op_hist_y_axis_config, | |
236 | "Histogram of write buffer persist time (nanoseconds) vs. bytes written"); | |
237 | plb.add_time_avg( | |
238 | l_librbd_pwl_log_op_app_to_cmp_t, "op_app_to_cmp_t", | |
239 | "Average log append to persist complete time (log entry append/replicate + wait for complete time)"); | |
240 | plb.add_time_avg( | |
241 | l_librbd_pwl_log_op_app_to_appc_t, "op_app_to_appc_t", | |
242 | "Average log append to persist complete time (log entry append/replicate time)"); | |
243 | plb.add_u64_counter_histogram( | |
244 | l_librbd_pwl_log_op_app_to_appc_t_hist, "op_app_to_appc_t_bytes_histogram", | |
245 | op_hist_x_axis_config, op_hist_y_axis_config, | |
246 | "Histogram of log append persist time (nanoseconds) (vs. op bytes)"); | |
247 | ||
248 | plb.add_u64_counter(l_librbd_pwl_discard, "discard", "Discards"); | |
249 | plb.add_u64_counter(l_librbd_pwl_discard_bytes, "discard_bytes", "Bytes discarded"); | |
250 | plb.add_time_avg(l_librbd_pwl_discard_latency, "discard_lat", "Discard latency"); | |
251 | ||
252 | plb.add_u64_counter(l_librbd_pwl_aio_flush, "aio_flush", "AIO flush (flush to RWL)"); | |
253 | plb.add_u64_counter(l_librbd_pwl_aio_flush_def, "aio_flush_def", "AIO flushes deferred for resources"); | |
254 | plb.add_time_avg(l_librbd_pwl_aio_flush_latency, "aio_flush_lat", "AIO flush latency"); | |
255 | ||
256 | plb.add_u64_counter(l_librbd_pwl_ws,"ws", "Write Sames"); | |
257 | plb.add_u64_counter(l_librbd_pwl_ws_bytes, "ws_bytes", "Write Same bytes to image"); | |
258 | plb.add_time_avg(l_librbd_pwl_ws_latency, "ws_lat", "Write Same latency"); | |
259 | ||
260 | plb.add_u64_counter(l_librbd_pwl_cmp, "cmp", "Compare and Write requests"); | |
261 | plb.add_u64_counter(l_librbd_pwl_cmp_bytes, "cmp_bytes", "Compare and Write bytes compared/written"); | |
262 | plb.add_time_avg(l_librbd_pwl_cmp_latency, "cmp_lat", "Compare and Write latecy"); | |
263 | plb.add_u64_counter(l_librbd_pwl_cmp_fails, "cmp_fails", "Compare and Write compare fails"); | |
264 | ||
a4b75251 TL |
265 | plb.add_u64_counter(l_librbd_pwl_internal_flush, "internal_flush", "Flush RWL (write back to OSD)"); |
266 | plb.add_time_avg(l_librbd_pwl_writeback_latency, "writeback_lat", "write back to OSD latency"); | |
f67539c2 TL |
267 | plb.add_u64_counter(l_librbd_pwl_invalidate_cache, "invalidate", "Invalidate RWL"); |
268 | plb.add_u64_counter(l_librbd_pwl_invalidate_discard_cache, "discard", "Discard and invalidate RWL"); | |
269 | ||
270 | plb.add_time_avg(l_librbd_pwl_append_tx_t, "append_tx_lat", "Log append transaction latency"); | |
271 | plb.add_u64_counter_histogram( | |
272 | l_librbd_pwl_append_tx_t_hist, "append_tx_lat_histogram", | |
273 | op_hist_x_axis_config, op_hist_y_axis_count_config, | |
274 | "Histogram of log append transaction time (nanoseconds) vs. entries appended"); | |
275 | plb.add_time_avg(l_librbd_pwl_retire_tx_t, "retire_tx_lat", "Log retire transaction latency"); | |
276 | plb.add_u64_counter_histogram( | |
277 | l_librbd_pwl_retire_tx_t_hist, "retire_tx_lat_histogram", | |
278 | op_hist_x_axis_config, op_hist_y_axis_count_config, | |
279 | "Histogram of log retire transaction time (nanoseconds) vs. entries retired"); | |
280 | ||
281 | m_perfcounter = plb.create_perf_counters(); | |
282 | m_image_ctx.cct->get_perfcounters_collection()->add(m_perfcounter); | |
283 | } | |
284 | ||
285 | template <typename I> | |
286 | void AbstractWriteLog<I>::perf_stop() { | |
287 | ceph_assert(m_perfcounter); | |
288 | m_image_ctx.cct->get_perfcounters_collection()->remove(m_perfcounter); | |
289 | delete m_perfcounter; | |
290 | } | |
291 | ||
292 | template <typename I> | |
293 | void AbstractWriteLog<I>::log_perf() { | |
294 | bufferlist bl; | |
295 | Formatter *f = Formatter::create("json-pretty"); | |
296 | bl.append("Perf dump follows\n--- Begin perf dump ---\n"); | |
297 | bl.append("{\n"); | |
298 | stringstream ss; | |
299 | utime_t now = ceph_clock_now(); | |
300 | ss << "\"test_time\": \"" << now << "\","; | |
301 | ss << "\"image\": \"" << m_image_ctx.name << "\","; | |
302 | bl.append(ss); | |
303 | bl.append("\"stats\": "); | |
304 | m_image_ctx.cct->get_perfcounters_collection()->dump_formatted(f, 0); | |
305 | f->flush(bl); | |
306 | bl.append(",\n\"histograms\": "); | |
307 | m_image_ctx.cct->get_perfcounters_collection()->dump_formatted_histograms(f, 0); | |
308 | f->flush(bl); | |
309 | delete f; | |
310 | bl.append("}\n--- End perf dump ---\n"); | |
311 | bl.append('\0'); | |
312 | ldout(m_image_ctx.cct, 1) << bl.c_str() << dendl; | |
313 | } | |
314 | ||
315 | template <typename I> | |
316 | void AbstractWriteLog<I>::periodic_stats() { | |
317 | std::lock_guard locker(m_lock); | |
a4b75251 TL |
318 | ldout(m_image_ctx.cct, 1) << "STATS: m_log_entries=" << m_log_entries.size() |
319 | << ", m_dirty_log_entries=" << m_dirty_log_entries.size() | |
320 | << ", m_free_log_entries=" << m_free_log_entries | |
321 | << ", m_bytes_allocated=" << m_bytes_allocated | |
322 | << ", m_bytes_cached=" << m_bytes_cached | |
323 | << ", m_bytes_dirty=" << m_bytes_dirty | |
324 | << ", bytes available=" << m_bytes_allocated_cap - m_bytes_allocated | |
325 | << ", m_first_valid_entry=" << m_first_valid_entry | |
326 | << ", m_first_free_entry=" << m_first_free_entry | |
327 | << ", m_current_sync_gen=" << m_current_sync_gen | |
328 | << ", m_flushed_sync_gen=" << m_flushed_sync_gen | |
f67539c2 TL |
329 | << dendl; |
330 | } | |
331 | ||
332 | template <typename I> | |
333 | void AbstractWriteLog<I>::arm_periodic_stats() { | |
334 | ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); | |
335 | if (m_periodic_stats_enabled) { | |
336 | m_timer_ctx = new LambdaContext( | |
337 | [this](int r) { | |
338 | /* m_timer_lock is held */ | |
339 | periodic_stats(); | |
340 | arm_periodic_stats(); | |
341 | }); | |
342 | m_timer->add_event_after(LOG_STATS_INTERVAL_SECONDS, m_timer_ctx); | |
343 | } | |
344 | } | |
345 | ||
346 | template <typename I> | |
347 | void AbstractWriteLog<I>::update_entries(std::shared_ptr<GenericLogEntry> *log_entry, | |
348 | WriteLogCacheEntry *cache_entry, std::map<uint64_t, bool> &missing_sync_points, | |
349 | std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries, | |
a4b75251 | 350 | uint64_t entry_index) { |
f67539c2 TL |
351 | bool writer = cache_entry->is_writer(); |
352 | if (cache_entry->is_sync_point()) { | |
353 | ldout(m_image_ctx.cct, 20) << "Entry " << entry_index | |
354 | << " is a sync point. cache_entry=[" << *cache_entry << "]" << dendl; | |
355 | auto sync_point_entry = std::make_shared<SyncPointLogEntry>(cache_entry->sync_gen_number); | |
356 | *log_entry = sync_point_entry; | |
357 | sync_point_entries[cache_entry->sync_gen_number] = sync_point_entry; | |
358 | missing_sync_points.erase(cache_entry->sync_gen_number); | |
359 | m_current_sync_gen = cache_entry->sync_gen_number; | |
360 | } else if (cache_entry->is_write()) { | |
361 | ldout(m_image_ctx.cct, 20) << "Entry " << entry_index | |
362 | << " is a write. cache_entry=[" << *cache_entry << "]" << dendl; | |
363 | auto write_entry = | |
364 | m_builder->create_write_log_entry(nullptr, cache_entry->image_offset_bytes, cache_entry->write_bytes); | |
365 | write_data_to_buffer(write_entry, cache_entry); | |
366 | *log_entry = write_entry; | |
367 | } else if (cache_entry->is_writesame()) { | |
368 | ldout(m_image_ctx.cct, 20) << "Entry " << entry_index | |
369 | << " is a write same. cache_entry=[" << *cache_entry << "]" << dendl; | |
370 | auto ws_entry = | |
371 | m_builder->create_writesame_log_entry(nullptr, cache_entry->image_offset_bytes, | |
372 | cache_entry->write_bytes, cache_entry->ws_datalen); | |
373 | write_data_to_buffer(ws_entry, cache_entry); | |
374 | *log_entry = ws_entry; | |
375 | } else if (cache_entry->is_discard()) { | |
376 | ldout(m_image_ctx.cct, 20) << "Entry " << entry_index | |
377 | << " is a discard. cache_entry=[" << *cache_entry << "]" << dendl; | |
378 | auto discard_entry = | |
379 | std::make_shared<DiscardLogEntry>(nullptr, cache_entry->image_offset_bytes, cache_entry->write_bytes, | |
380 | m_discard_granularity_bytes); | |
381 | *log_entry = discard_entry; | |
382 | } else { | |
383 | lderr(m_image_ctx.cct) << "Unexpected entry type in entry " << entry_index | |
384 | << ", cache_entry=[" << *cache_entry << "]" << dendl; | |
385 | } | |
386 | ||
387 | if (writer) { | |
388 | ldout(m_image_ctx.cct, 20) << "Entry " << entry_index | |
389 | << " writes. cache_entry=[" << *cache_entry << "]" << dendl; | |
390 | if (!sync_point_entries[cache_entry->sync_gen_number]) { | |
391 | missing_sync_points[cache_entry->sync_gen_number] = true; | |
392 | } | |
393 | } | |
394 | } | |
395 | ||
396 | template <typename I> | |
397 | void AbstractWriteLog<I>::update_sync_points(std::map<uint64_t, bool> &missing_sync_points, | |
398 | std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries, | |
a4b75251 | 399 | DeferredContexts &later) { |
f67539c2 TL |
400 | /* Create missing sync points. These must not be appended until the |
401 | * entry reload is complete and the write map is up to | |
402 | * date. Currently this is handled by the deferred contexts object | |
403 | * passed to new_sync_point(). These contexts won't be completed | |
404 | * until this function returns. */ | |
405 | for (auto &kv : missing_sync_points) { | |
406 | ldout(m_image_ctx.cct, 5) << "Adding sync point " << kv.first << dendl; | |
407 | if (0 == m_current_sync_gen) { | |
408 | /* The unlikely case where the log contains writing entries, but no sync | |
409 | * points (e.g. because they were all retired) */ | |
410 | m_current_sync_gen = kv.first-1; | |
411 | } | |
412 | ceph_assert(kv.first == m_current_sync_gen+1); | |
413 | init_flush_new_sync_point(later); | |
414 | ceph_assert(kv.first == m_current_sync_gen); | |
20effc67 | 415 | sync_point_entries[kv.first] = m_current_sync_point->log_entry; |
f67539c2 TL |
416 | } |
417 | ||
418 | /* | |
419 | * Iterate over the log entries again (this time via the global | |
420 | * entries list), connecting write entries to their sync points and | |
421 | * updating the sync point stats. | |
422 | * | |
423 | * Add writes to the write log map. | |
424 | */ | |
425 | std::shared_ptr<SyncPointLogEntry> previous_sync_point_entry = nullptr; | |
426 | for (auto &log_entry : m_log_entries) { | |
427 | if ((log_entry->write_bytes() > 0) || (log_entry->bytes_dirty() > 0)) { | |
428 | /* This entry is one of the types that write */ | |
429 | auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(log_entry); | |
430 | if (gen_write_entry) { | |
431 | auto sync_point_entry = sync_point_entries[gen_write_entry->ram_entry.sync_gen_number]; | |
432 | if (!sync_point_entry) { | |
433 | lderr(m_image_ctx.cct) << "Sync point missing for entry=[" << *gen_write_entry << "]" << dendl; | |
434 | ceph_assert(false); | |
435 | } else { | |
436 | gen_write_entry->sync_point_entry = sync_point_entry; | |
437 | sync_point_entry->writes++; | |
438 | sync_point_entry->bytes += gen_write_entry->ram_entry.write_bytes; | |
439 | sync_point_entry->writes_completed++; | |
440 | m_blocks_to_log_entries.add_log_entry(gen_write_entry); | |
441 | /* This entry is only dirty if its sync gen number is > the flushed | |
442 | * sync gen number from the root object. */ | |
443 | if (gen_write_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) { | |
444 | m_dirty_log_entries.push_back(log_entry); | |
445 | m_bytes_dirty += gen_write_entry->bytes_dirty(); | |
446 | } else { | |
447 | gen_write_entry->set_flushed(true); | |
448 | sync_point_entry->writes_flushed++; | |
449 | } | |
a4b75251 TL |
450 | |
451 | /* calc m_bytes_allocated & m_bytes_cached */ | |
452 | inc_allocated_cached_bytes(log_entry); | |
f67539c2 TL |
453 | } |
454 | } | |
455 | } else { | |
456 | /* This entry is sync point entry */ | |
457 | auto sync_point_entry = static_pointer_cast<SyncPointLogEntry>(log_entry); | |
458 | if (sync_point_entry) { | |
459 | if (previous_sync_point_entry) { | |
460 | previous_sync_point_entry->next_sync_point_entry = sync_point_entry; | |
461 | if (previous_sync_point_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) { | |
462 | sync_point_entry->prior_sync_point_flushed = false; | |
463 | ceph_assert(!previous_sync_point_entry->prior_sync_point_flushed || | |
464 | (0 == previous_sync_point_entry->writes) || | |
465 | (previous_sync_point_entry->writes >= previous_sync_point_entry->writes_flushed)); | |
466 | } else { | |
467 | sync_point_entry->prior_sync_point_flushed = true; | |
468 | ceph_assert(previous_sync_point_entry->prior_sync_point_flushed); | |
469 | ceph_assert(previous_sync_point_entry->writes == previous_sync_point_entry->writes_flushed); | |
470 | } | |
471 | } else { | |
472 | /* There are no previous sync points, so we'll consider them flushed */ | |
473 | sync_point_entry->prior_sync_point_flushed = true; | |
474 | } | |
475 | previous_sync_point_entry = sync_point_entry; | |
476 | ldout(m_image_ctx.cct, 10) << "Loaded to sync point=[" << *sync_point_entry << dendl; | |
477 | } | |
478 | } | |
479 | } | |
480 | if (0 == m_current_sync_gen) { | |
481 | /* If a re-opened log was completely flushed, we'll have found no sync point entries here, | |
482 | * and not advanced m_current_sync_gen. Here we ensure it starts past the last flushed sync | |
483 | * point recorded in the log. */ | |
484 | m_current_sync_gen = m_flushed_sync_gen; | |
485 | } | |
486 | } | |
487 | ||
488 | template <typename I> | |
489 | void AbstractWriteLog<I>::pwl_init(Context *on_finish, DeferredContexts &later) { | |
490 | CephContext *cct = m_image_ctx.cct; | |
491 | ldout(cct, 20) << dendl; | |
492 | ceph_assert(m_cache_state); | |
493 | std::lock_guard locker(m_lock); | |
494 | ceph_assert(!m_initialized); | |
495 | ldout(cct,5) << "image name: " << m_image_ctx.name << " id: " << m_image_ctx.id << dendl; | |
496 | ||
497 | if (!m_cache_state->present) { | |
498 | m_cache_state->host = ceph_get_short_hostname(); | |
499 | m_cache_state->size = m_image_ctx.config.template get_val<uint64_t>( | |
500 | "rbd_persistent_cache_size"); | |
501 | ||
502 | string path = m_image_ctx.config.template get_val<string>( | |
503 | "rbd_persistent_cache_path"); | |
504 | std::string pool_name = m_image_ctx.md_ctx.get_pool_name(); | |
505 | m_cache_state->path = path + "/rbd-pwl." + pool_name + "." + m_image_ctx.id + ".pool"; | |
506 | } | |
507 | ||
508 | ldout(cct,5) << "pwl_size: " << m_cache_state->size << dendl; | |
509 | ldout(cct,5) << "pwl_path: " << m_cache_state->path << dendl; | |
510 | ||
511 | m_log_pool_name = m_cache_state->path; | |
a4b75251 TL |
512 | m_log_pool_size = max(m_cache_state->size, MIN_POOL_SIZE); |
513 | m_log_pool_size = p2align(m_log_pool_size, POOL_SIZE_ALIGN); | |
514 | ldout(cct, 5) << "pool " << m_log_pool_name << " size " << m_log_pool_size | |
515 | << " (adjusted from " << m_cache_state->size << ")" << dendl; | |
f67539c2 TL |
516 | |
517 | if ((!m_cache_state->present) && | |
518 | (access(m_log_pool_name.c_str(), F_OK) == 0)) { | |
519 | ldout(cct, 5) << "There's an existing pool file " << m_log_pool_name | |
520 | << ", While there's no cache in the image metatata." << dendl; | |
521 | if (remove(m_log_pool_name.c_str()) != 0) { | |
20effc67 | 522 | lderr(cct) << "failed to remove the pool file " << m_log_pool_name |
f67539c2 TL |
523 | << dendl; |
524 | on_finish->complete(-errno); | |
525 | return; | |
526 | } else { | |
527 | ldout(cct, 5) << "Removed the existing pool file." << dendl; | |
528 | } | |
529 | } else if ((m_cache_state->present) && | |
530 | (access(m_log_pool_name.c_str(), F_OK) != 0)) { | |
20effc67 TL |
531 | lderr(cct) << "can't find the existed pool file: " << m_log_pool_name |
532 | << ". error: " << cpp_strerror(-errno) << dendl; | |
f67539c2 | 533 | on_finish->complete(-errno); |
a4b75251 | 534 | return; |
f67539c2 TL |
535 | } |
536 | ||
a4b75251 TL |
537 | bool succeeded = initialize_pool(on_finish, later); |
538 | if (!succeeded) { | |
539 | return ; | |
540 | } | |
f67539c2 TL |
541 | |
542 | ldout(cct,1) << "pool " << m_log_pool_name << " has " << m_total_log_entries | |
543 | << " log entries, " << m_free_log_entries << " of which are free." | |
544 | << " first_valid=" << m_first_valid_entry | |
545 | << ", first_free=" << m_first_free_entry | |
546 | << ", flushed_sync_gen=" << m_flushed_sync_gen | |
547 | << ", m_current_sync_gen=" << m_current_sync_gen << dendl; | |
548 | if (m_first_free_entry == m_first_valid_entry) { | |
549 | ldout(cct,1) << "write log is empty" << dendl; | |
550 | m_cache_state->empty = true; | |
551 | } | |
552 | ||
553 | /* Start the sync point following the last one seen in the | |
554 | * log. Flush the last sync point created during the loading of the | |
555 | * existing log entries. */ | |
556 | init_flush_new_sync_point(later); | |
557 | ldout(cct,20) << "new sync point = [" << m_current_sync_point << "]" << dendl; | |
558 | ||
559 | m_initialized = true; | |
560 | // Start the thread | |
561 | m_thread_pool.start(); | |
562 | ||
563 | m_periodic_stats_enabled = m_cache_state->log_periodic_stats; | |
564 | /* Do these after we drop lock */ | |
565 | later.add(new LambdaContext([this](int r) { | |
566 | if (m_periodic_stats_enabled) { | |
567 | /* Log stats for the first time */ | |
568 | periodic_stats(); | |
569 | /* Arm periodic stats logging for the first time */ | |
570 | std::lock_guard timer_locker(*m_timer_lock); | |
571 | arm_periodic_stats(); | |
572 | } | |
573 | })); | |
574 | m_image_ctx.op_work_queue->queue(on_finish, 0); | |
575 | } | |
576 | ||
577 | template <typename I> | |
578 | void AbstractWriteLog<I>::update_image_cache_state(Context *on_finish) { | |
579 | m_cache_state->write_image_cache_state(on_finish); | |
580 | } | |
581 | ||
582 | template <typename I> | |
583 | void AbstractWriteLog<I>::init(Context *on_finish) { | |
584 | CephContext *cct = m_image_ctx.cct; | |
585 | ldout(cct, 20) << dendl; | |
a4b75251 TL |
586 | auto pname = std::string("librbd-pwl-") + m_image_ctx.id + |
587 | std::string("-") + m_image_ctx.md_ctx.get_pool_name() + | |
588 | std::string("-") + m_image_ctx.name; | |
589 | perf_start(pname); | |
f67539c2 TL |
590 | |
591 | ceph_assert(!m_initialized); | |
592 | ||
593 | Context *ctx = new LambdaContext( | |
594 | [this, on_finish](int r) { | |
595 | if (r >= 0) { | |
596 | update_image_cache_state(on_finish); | |
597 | } else { | |
598 | on_finish->complete(r); | |
599 | } | |
600 | }); | |
601 | ||
602 | DeferredContexts later; | |
603 | pwl_init(ctx, later); | |
604 | } | |
605 | ||
606 | template <typename I> | |
607 | void AbstractWriteLog<I>::shut_down(Context *on_finish) { | |
608 | CephContext *cct = m_image_ctx.cct; | |
609 | ldout(cct, 20) << dendl; | |
610 | ||
611 | ldout(cct,5) << "image name: " << m_image_ctx.name << " id: " << m_image_ctx.id << dendl; | |
612 | ||
613 | Context *ctx = new LambdaContext( | |
614 | [this, on_finish](int r) { | |
615 | ldout(m_image_ctx.cct, 6) << "shutdown complete" << dendl; | |
616 | m_image_ctx.op_work_queue->queue(on_finish, r); | |
617 | }); | |
618 | ctx = new LambdaContext( | |
619 | [this, ctx](int r) { | |
620 | ldout(m_image_ctx.cct, 6) << "image cache cleaned" << dendl; | |
621 | Context *next_ctx = override_ctx(r, ctx); | |
622 | bool periodic_stats_enabled = m_periodic_stats_enabled; | |
623 | m_periodic_stats_enabled = false; | |
624 | ||
625 | if (periodic_stats_enabled) { | |
626 | /* Log stats one last time if they were enabled */ | |
627 | periodic_stats(); | |
628 | } | |
629 | { | |
630 | std::lock_guard locker(m_lock); | |
a4b75251 | 631 | check_image_cache_state_clean(); |
f67539c2 TL |
632 | m_wake_up_enabled = false; |
633 | m_cache_state->clean = true; | |
634 | m_log_entries.clear(); | |
635 | ||
636 | remove_pool_file(); | |
637 | ||
638 | if (m_perfcounter) { | |
639 | perf_stop(); | |
640 | } | |
641 | } | |
642 | update_image_cache_state(next_ctx); | |
643 | }); | |
a4b75251 TL |
644 | ctx = new LambdaContext( |
645 | [this, ctx](int r) { | |
646 | Context *next_ctx = override_ctx(r, ctx); | |
647 | ldout(m_image_ctx.cct, 6) << "waiting for in flight operations" << dendl; | |
648 | // Wait for in progress IOs to complete | |
649 | next_ctx = util::create_async_context_callback(&m_work_queue, next_ctx); | |
650 | m_async_op_tracker.wait_for_ops(next_ctx); | |
651 | }); | |
f67539c2 TL |
652 | ctx = new LambdaContext( |
653 | [this, ctx](int r) { | |
654 | Context *next_ctx = override_ctx(r, ctx); | |
655 | { | |
656 | /* Sync with process_writeback_dirty_entries() */ | |
657 | RWLock::WLocker entry_reader_wlocker(m_entry_reader_lock); | |
658 | m_shutting_down = true; | |
659 | /* Flush all writes to OSDs (unless disabled) and wait for all | |
660 | in-progress flush writes to complete */ | |
661 | ldout(m_image_ctx.cct, 6) << "flushing" << dendl; | |
662 | if (m_periodic_stats_enabled) { | |
663 | periodic_stats(); | |
664 | } | |
665 | } | |
666 | flush_dirty_entries(next_ctx); | |
667 | }); | |
f67539c2 TL |
668 | ctx = new LambdaContext( |
669 | [this, ctx](int r) { | |
670 | ldout(m_image_ctx.cct, 6) << "Done internal_flush in shutdown" << dendl; | |
671 | m_work_queue.queue(ctx, r); | |
672 | }); | |
673 | /* Complete all in-flight writes before shutting down */ | |
674 | ldout(m_image_ctx.cct, 6) << "internal_flush in shutdown" << dendl; | |
675 | internal_flush(false, ctx); | |
676 | } | |
677 | ||
678 | template <typename I> | |
679 | void AbstractWriteLog<I>::read(Extents&& image_extents, | |
680 | ceph::bufferlist* bl, | |
681 | int fadvise_flags, Context *on_finish) { | |
682 | CephContext *cct = m_image_ctx.cct; | |
683 | utime_t now = ceph_clock_now(); | |
684 | ||
685 | on_finish = new LambdaContext( | |
686 | [this, on_finish](int r) { | |
687 | m_async_op_tracker.finish_op(); | |
688 | on_finish->complete(r); | |
689 | }); | |
690 | C_ReadRequest *read_ctx = m_builder->create_read_request( | |
691 | cct, now, m_perfcounter, bl, on_finish); | |
692 | ldout(cct, 20) << "name: " << m_image_ctx.name << " id: " << m_image_ctx.id | |
20effc67 TL |
693 | << "image_extents=" << image_extents |
694 | << ", bl=" << bl | |
695 | << ", on_finish=" << on_finish << dendl; | |
f67539c2 TL |
696 | |
697 | ceph_assert(m_initialized); | |
698 | bl->clear(); | |
699 | m_perfcounter->inc(l_librbd_pwl_rd_req, 1); | |
700 | ||
a4b75251 | 701 | std::vector<std::shared_ptr<GenericWriteLogEntry>> log_entries_to_read; |
f67539c2 TL |
702 | std::vector<bufferlist*> bls_to_read; |
703 | ||
704 | m_async_op_tracker.start_op(); | |
705 | Context *ctx = new LambdaContext( | |
706 | [this, read_ctx, fadvise_flags](int r) { | |
707 | if (read_ctx->miss_extents.empty()) { | |
708 | /* All of this read comes from RWL */ | |
709 | read_ctx->complete(0); | |
710 | } else { | |
711 | /* Pass the read misses on to the layer below RWL */ | |
712 | m_image_writeback.aio_read( | |
713 | std::move(read_ctx->miss_extents), &read_ctx->miss_bl, | |
714 | fadvise_flags, read_ctx); | |
715 | } | |
716 | }); | |
717 | ||
718 | /* | |
719 | * The strategy here is to look up all the WriteLogMapEntries that overlap | |
720 | * this read, and iterate through those to separate this read into hits and | |
721 | * misses. A new Extents object is produced here with Extents for each miss | |
722 | * region. The miss Extents is then passed on to the read cache below RWL. We | |
723 | * also produce an ImageExtentBufs for all the extents (hit or miss) in this | |
724 | * read. When the read from the lower cache layer completes, we iterate | |
725 | * through the ImageExtentBufs and insert buffers for each cache hit at the | |
726 | * appropriate spot in the bufferlist returned from below for the miss | |
727 | * read. The buffers we insert here refer directly to regions of various | |
728 | * write log entry data buffers. | |
729 | * | |
730 | * Locking: These buffer objects hold a reference on the write log entries | |
731 | * they refer to. Log entries can't be retired until there are no references. | |
732 | * The GenericWriteLogEntry references are released by the buffer destructor. | |
733 | */ | |
734 | for (auto &extent : image_extents) { | |
735 | uint64_t extent_offset = 0; | |
736 | RWLock::RLocker entry_reader_locker(m_entry_reader_lock); | |
737 | WriteLogMapEntries map_entries = m_blocks_to_log_entries.find_map_entries( | |
738 | block_extent(extent)); | |
739 | for (auto &map_entry : map_entries) { | |
740 | Extent entry_image_extent(pwl::image_extent(map_entry.block_extent)); | |
741 | /* If this map entry starts after the current image extent offset ... */ | |
742 | if (entry_image_extent.first > extent.first + extent_offset) { | |
743 | /* ... add range before map_entry to miss extents */ | |
744 | uint64_t miss_extent_start = extent.first + extent_offset; | |
745 | uint64_t miss_extent_length = entry_image_extent.first - | |
746 | miss_extent_start; | |
747 | Extent miss_extent(miss_extent_start, miss_extent_length); | |
748 | read_ctx->miss_extents.push_back(miss_extent); | |
749 | /* Add miss range to read extents */ | |
750 | auto miss_extent_buf = std::make_shared<ImageExtentBuf>(miss_extent); | |
751 | read_ctx->read_extents.push_back(miss_extent_buf); | |
752 | extent_offset += miss_extent_length; | |
753 | } | |
754 | ceph_assert(entry_image_extent.first <= extent.first + extent_offset); | |
755 | uint64_t entry_offset = 0; | |
756 | /* If this map entry starts before the current image extent offset ... */ | |
757 | if (entry_image_extent.first < extent.first + extent_offset) { | |
758 | /* ... compute offset into log entry for this read extent */ | |
759 | entry_offset = (extent.first + extent_offset) - entry_image_extent.first; | |
760 | } | |
761 | /* This read hit ends at the end of the extent or the end of the log | |
762 | entry, whichever is less. */ | |
763 | uint64_t entry_hit_length = min(entry_image_extent.second - entry_offset, | |
764 | extent.second - extent_offset); | |
765 | Extent hit_extent(entry_image_extent.first, entry_hit_length); | |
766 | if (0 == map_entry.log_entry->write_bytes() && | |
767 | 0 < map_entry.log_entry->bytes_dirty()) { | |
768 | /* discard log entry */ | |
769 | ldout(cct, 20) << "discard log entry" << dendl; | |
770 | auto discard_entry = map_entry.log_entry; | |
771 | ldout(cct, 20) << "read hit on discard entry: log_entry=" | |
772 | << *discard_entry | |
773 | << dendl; | |
774 | /* Discards read as zero, so we'll construct a bufferlist of zeros */ | |
775 | bufferlist zero_bl; | |
776 | zero_bl.append_zero(entry_hit_length); | |
777 | /* Add hit extent to read extents */ | |
778 | auto hit_extent_buf = std::make_shared<ImageExtentBuf>( | |
779 | hit_extent, zero_bl); | |
780 | read_ctx->read_extents.push_back(hit_extent_buf); | |
781 | } else { | |
782 | ldout(cct, 20) << "write or writesame log entry" << dendl; | |
783 | /* write and writesame log entry */ | |
784 | /* Offset of the map entry into the log entry's buffer */ | |
785 | uint64_t map_entry_buffer_offset = entry_image_extent.first - | |
786 | map_entry.log_entry->ram_entry.image_offset_bytes; | |
787 | /* Offset into the log entry buffer of this read hit */ | |
788 | uint64_t read_buffer_offset = map_entry_buffer_offset + entry_offset; | |
789 | /* Create buffer object referring to pmem pool for this read hit */ | |
790 | collect_read_extents( | |
791 | read_buffer_offset, map_entry, log_entries_to_read, bls_to_read, | |
792 | entry_hit_length, hit_extent, read_ctx); | |
793 | } | |
794 | /* Exclude RWL hit range from buffer and extent */ | |
795 | extent_offset += entry_hit_length; | |
796 | ldout(cct, 20) << map_entry << dendl; | |
797 | } | |
798 | /* If the last map entry didn't consume the entire image extent ... */ | |
799 | if (extent.second > extent_offset) { | |
800 | /* ... add the rest of this extent to miss extents */ | |
801 | uint64_t miss_extent_start = extent.first + extent_offset; | |
802 | uint64_t miss_extent_length = extent.second - extent_offset; | |
803 | Extent miss_extent(miss_extent_start, miss_extent_length); | |
804 | read_ctx->miss_extents.push_back(miss_extent); | |
805 | /* Add miss range to read extents */ | |
806 | auto miss_extent_buf = std::make_shared<ImageExtentBuf>(miss_extent); | |
807 | read_ctx->read_extents.push_back(miss_extent_buf); | |
808 | extent_offset += miss_extent_length; | |
809 | } | |
810 | } | |
811 | ||
20effc67 TL |
812 | ldout(cct, 20) << "miss_extents=" << read_ctx->miss_extents |
813 | << ", miss_bl=" << read_ctx->miss_bl << dendl; | |
f67539c2 TL |
814 | |
815 | complete_read(log_entries_to_read, bls_to_read, ctx); | |
816 | } | |
817 | ||
818 | template <typename I> | |
819 | void AbstractWriteLog<I>::write(Extents &&image_extents, | |
820 | bufferlist&& bl, | |
821 | int fadvise_flags, | |
822 | Context *on_finish) { | |
823 | CephContext *cct = m_image_ctx.cct; | |
824 | ||
825 | ldout(cct, 20) << "aio_write" << dendl; | |
826 | ||
827 | utime_t now = ceph_clock_now(); | |
828 | m_perfcounter->inc(l_librbd_pwl_wr_req, 1); | |
829 | ||
830 | ceph_assert(m_initialized); | |
831 | ||
20effc67 TL |
832 | /* Split image extents larger than 1M. This isn't strictly necessary but |
833 | * makes libpmemobj allocator's job easier and reduces pmemobj_defrag() cost. | |
834 | * We plan to manage pmem space and allocation by ourselves in the future. | |
f67539c2 TL |
835 | */ |
836 | Extents split_image_extents; | |
837 | uint64_t max_extent_size = get_max_extent(); | |
838 | if (max_extent_size != 0) { | |
839 | for (auto extent : image_extents) { | |
840 | if (extent.second > max_extent_size) { | |
841 | uint64_t off = extent.first; | |
842 | uint64_t extent_bytes = extent.second; | |
843 | for (int i = 0; extent_bytes != 0; ++i) { | |
844 | Extent _ext; | |
845 | _ext.first = off + i * max_extent_size; | |
846 | _ext.second = std::min(max_extent_size, extent_bytes); | |
847 | extent_bytes = extent_bytes - _ext.second ; | |
848 | split_image_extents.emplace_back(_ext); | |
849 | } | |
850 | } else { | |
851 | split_image_extents.emplace_back(extent); | |
852 | } | |
853 | } | |
854 | } else { | |
855 | split_image_extents = image_extents; | |
856 | } | |
857 | ||
858 | C_WriteRequestT *write_req = | |
859 | m_builder->create_write_request(*this, now, std::move(split_image_extents), | |
860 | std::move(bl), fadvise_flags, m_lock, | |
861 | m_perfcounter, on_finish); | |
862 | m_perfcounter->inc(l_librbd_pwl_wr_bytes, | |
863 | write_req->image_extents_summary.total_bytes); | |
864 | ||
865 | /* The lambda below will be called when the block guard for all | |
866 | * blocks affected by this write is obtained */ | |
867 | GuardedRequestFunctionContext *guarded_ctx = | |
868 | new GuardedRequestFunctionContext([this, | |
869 | write_req](GuardedRequestFunctionContext &guard_ctx) { | |
870 | write_req->blockguard_acquired(guard_ctx); | |
871 | alloc_and_dispatch_io_req(write_req); | |
872 | }); | |
873 | ||
874 | detain_guarded_request(write_req, guarded_ctx, false); | |
875 | } | |
876 | ||
877 | template <typename I> | |
878 | void AbstractWriteLog<I>::discard(uint64_t offset, uint64_t length, | |
879 | uint32_t discard_granularity_bytes, | |
880 | Context *on_finish) { | |
881 | CephContext *cct = m_image_ctx.cct; | |
882 | ||
883 | ldout(cct, 20) << dendl; | |
884 | ||
885 | utime_t now = ceph_clock_now(); | |
886 | m_perfcounter->inc(l_librbd_pwl_discard, 1); | |
887 | Extents discard_extents = {{offset, length}}; | |
888 | m_discard_granularity_bytes = discard_granularity_bytes; | |
889 | ||
890 | ceph_assert(m_initialized); | |
891 | ||
892 | auto *discard_req = | |
893 | new C_DiscardRequestT(*this, now, std::move(discard_extents), discard_granularity_bytes, | |
894 | m_lock, m_perfcounter, on_finish); | |
895 | ||
896 | /* The lambda below will be called when the block guard for all | |
897 | * blocks affected by this write is obtained */ | |
898 | GuardedRequestFunctionContext *guarded_ctx = | |
899 | new GuardedRequestFunctionContext([this, discard_req](GuardedRequestFunctionContext &guard_ctx) { | |
900 | discard_req->blockguard_acquired(guard_ctx); | |
901 | alloc_and_dispatch_io_req(discard_req); | |
902 | }); | |
903 | ||
904 | detain_guarded_request(discard_req, guarded_ctx, false); | |
905 | } | |
906 | ||
907 | /** | |
908 | * Aio_flush completes when all previously completed writes are | |
909 | * flushed to persistent cache. We make a best-effort attempt to also | |
910 | * defer until all in-progress writes complete, but we may not know | |
911 | * about all of the writes the application considers in-progress yet, | |
912 | * due to uncertainty in the IO submission workq (multiple WQ threads | |
913 | * may allow out-of-order submission). | |
914 | * | |
915 | * This flush operation will not wait for writes deferred for overlap | |
916 | * in the block guard. | |
917 | */ | |
918 | template <typename I> | |
919 | void AbstractWriteLog<I>::flush(io::FlushSource flush_source, Context *on_finish) { | |
920 | CephContext *cct = m_image_ctx.cct; | |
921 | ldout(cct, 20) << "on_finish=" << on_finish << " flush_source=" << flush_source << dendl; | |
922 | ||
923 | if (io::FLUSH_SOURCE_SHUTDOWN == flush_source || io::FLUSH_SOURCE_INTERNAL == flush_source || | |
924 | io::FLUSH_SOURCE_WRITE_BLOCK == flush_source) { | |
925 | internal_flush(false, on_finish); | |
926 | return; | |
927 | } | |
928 | m_perfcounter->inc(l_librbd_pwl_aio_flush, 1); | |
929 | ||
930 | /* May be called even if initialization fails */ | |
931 | if (!m_initialized) { | |
932 | ldout(cct, 05) << "never initialized" << dendl; | |
933 | /* Deadlock if completed here */ | |
934 | m_image_ctx.op_work_queue->queue(on_finish, 0); | |
935 | return; | |
936 | } | |
937 | ||
938 | { | |
939 | std::shared_lock image_locker(m_image_ctx.image_lock); | |
940 | if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) { | |
941 | on_finish->complete(-EROFS); | |
942 | return; | |
943 | } | |
944 | } | |
945 | ||
946 | auto flush_req = make_flush_req(on_finish); | |
947 | ||
948 | GuardedRequestFunctionContext *guarded_ctx = | |
949 | new GuardedRequestFunctionContext([this, flush_req](GuardedRequestFunctionContext &guard_ctx) { | |
950 | ldout(m_image_ctx.cct, 20) << "flush_req=" << flush_req << " cell=" << guard_ctx.cell << dendl; | |
951 | ceph_assert(guard_ctx.cell); | |
952 | flush_req->detained = guard_ctx.state.detained; | |
953 | /* We don't call flush_req->set_cell(), because the block guard will be released here */ | |
954 | { | |
955 | DeferredContexts post_unlock; /* Do these when the lock below is released */ | |
956 | std::lock_guard locker(m_lock); | |
957 | ||
958 | if (!m_persist_on_flush && m_persist_on_write_until_flush) { | |
959 | m_persist_on_flush = true; | |
960 | ldout(m_image_ctx.cct, 5) << "now persisting on flush" << dendl; | |
961 | } | |
962 | ||
963 | /* | |
964 | * Create a new sync point if there have been writes since the last | |
965 | * one. | |
966 | * | |
967 | * We do not flush the caches below the RWL here. | |
968 | */ | |
969 | flush_new_sync_point_if_needed(flush_req, post_unlock); | |
970 | } | |
971 | ||
972 | release_guarded_request(guard_ctx.cell); | |
973 | }); | |
974 | ||
975 | detain_guarded_request(flush_req, guarded_ctx, true); | |
976 | } | |
977 | ||
978 | template <typename I> | |
979 | void AbstractWriteLog<I>::writesame(uint64_t offset, uint64_t length, | |
980 | bufferlist&& bl, int fadvise_flags, | |
981 | Context *on_finish) { | |
982 | CephContext *cct = m_image_ctx.cct; | |
983 | ||
984 | ldout(cct, 20) << "aio_writesame" << dendl; | |
985 | ||
986 | utime_t now = ceph_clock_now(); | |
987 | Extents ws_extents = {{offset, length}}; | |
988 | m_perfcounter->inc(l_librbd_pwl_ws, 1); | |
989 | ceph_assert(m_initialized); | |
990 | ||
991 | /* A write same request is also a write request. The key difference is the | |
992 | * write same data buffer is shorter than the extent of the request. The full | |
993 | * extent will be used in the block guard, and appear in | |
994 | * m_blocks_to_log_entries_map. The data buffer allocated for the WS is only | |
995 | * as long as the length of the bl here, which is the pattern that's repeated | |
996 | * in the image for the entire length of this WS. Read hits and flushing of | |
997 | * write sames are different than normal writes. */ | |
998 | C_WriteSameRequestT *ws_req = | |
999 | m_builder->create_writesame_request(*this, now, std::move(ws_extents), std::move(bl), | |
1000 | fadvise_flags, m_lock, m_perfcounter, on_finish); | |
1001 | m_perfcounter->inc(l_librbd_pwl_ws_bytes, ws_req->image_extents_summary.total_bytes); | |
1002 | ||
1003 | /* The lambda below will be called when the block guard for all | |
1004 | * blocks affected by this write is obtained */ | |
1005 | GuardedRequestFunctionContext *guarded_ctx = | |
1006 | new GuardedRequestFunctionContext([this, ws_req](GuardedRequestFunctionContext &guard_ctx) { | |
1007 | ws_req->blockguard_acquired(guard_ctx); | |
1008 | alloc_and_dispatch_io_req(ws_req); | |
1009 | }); | |
1010 | ||
1011 | detain_guarded_request(ws_req, guarded_ctx, false); | |
1012 | } | |
1013 | ||
1014 | template <typename I> | |
1015 | void AbstractWriteLog<I>::compare_and_write(Extents &&image_extents, | |
1016 | bufferlist&& cmp_bl, | |
1017 | bufferlist&& bl, | |
1018 | uint64_t *mismatch_offset, | |
1019 | int fadvise_flags, | |
1020 | Context *on_finish) { | |
1021 | ldout(m_image_ctx.cct, 20) << dendl; | |
1022 | ||
1023 | utime_t now = ceph_clock_now(); | |
1024 | m_perfcounter->inc(l_librbd_pwl_cmp, 1); | |
1025 | ceph_assert(m_initialized); | |
1026 | ||
1027 | /* A compare and write request is also a write request. We only allocate | |
1028 | * resources and dispatch this write request if the compare phase | |
1029 | * succeeds. */ | |
1030 | C_WriteRequestT *cw_req = | |
1031 | m_builder->create_comp_and_write_request( | |
1032 | *this, now, std::move(image_extents), std::move(cmp_bl), std::move(bl), | |
1033 | mismatch_offset, fadvise_flags, m_lock, m_perfcounter, on_finish); | |
1034 | m_perfcounter->inc(l_librbd_pwl_cmp_bytes, cw_req->image_extents_summary.total_bytes); | |
1035 | ||
1036 | /* The lambda below will be called when the block guard for all | |
1037 | * blocks affected by this write is obtained */ | |
1038 | GuardedRequestFunctionContext *guarded_ctx = | |
1039 | new GuardedRequestFunctionContext([this, cw_req](GuardedRequestFunctionContext &guard_ctx) { | |
1040 | cw_req->blockguard_acquired(guard_ctx); | |
1041 | ||
1042 | auto read_complete_ctx = new LambdaContext( | |
1043 | [this, cw_req](int r) { | |
1044 | ldout(m_image_ctx.cct, 20) << "name: " << m_image_ctx.name << " id: " << m_image_ctx.id | |
1045 | << "cw_req=" << cw_req << dendl; | |
1046 | ||
1047 | /* Compare read_bl to cmp_bl to determine if this will produce a write */ | |
1048 | buffer::list aligned_read_bl; | |
1049 | if (cw_req->cmp_bl.length() < cw_req->read_bl.length()) { | |
1050 | aligned_read_bl.substr_of(cw_req->read_bl, 0, cw_req->cmp_bl.length()); | |
1051 | } | |
1052 | if (cw_req->cmp_bl.contents_equal(cw_req->read_bl) || | |
1053 | cw_req->cmp_bl.contents_equal(aligned_read_bl)) { | |
1054 | /* Compare phase succeeds. Begin write */ | |
1055 | ldout(m_image_ctx.cct, 5) << " cw_req=" << cw_req << " compare matched" << dendl; | |
1056 | cw_req->compare_succeeded = true; | |
1057 | *cw_req->mismatch_offset = 0; | |
1058 | /* Continue with this request as a write. Blockguard release and | |
1059 | * user request completion handled as if this were a plain | |
1060 | * write. */ | |
1061 | alloc_and_dispatch_io_req(cw_req); | |
1062 | } else { | |
1063 | /* Compare phase fails. Comp-and write ends now. */ | |
1064 | ldout(m_image_ctx.cct, 15) << " cw_req=" << cw_req << " compare failed" << dendl; | |
1065 | /* Bufferlist doesn't tell us where they differed, so we'll have to determine that here */ | |
1066 | uint64_t bl_index = 0; | |
1067 | for (bl_index = 0; bl_index < cw_req->cmp_bl.length(); bl_index++) { | |
1068 | if (cw_req->cmp_bl[bl_index] != cw_req->read_bl[bl_index]) { | |
1069 | ldout(m_image_ctx.cct, 15) << " cw_req=" << cw_req << " mismatch at " << bl_index << dendl; | |
1070 | break; | |
1071 | } | |
1072 | } | |
1073 | cw_req->compare_succeeded = false; | |
1074 | *cw_req->mismatch_offset = bl_index; | |
1075 | cw_req->complete_user_request(-EILSEQ); | |
1076 | cw_req->release_cell(); | |
1077 | cw_req->complete(0); | |
1078 | } | |
1079 | }); | |
1080 | ||
1081 | /* Read phase of comp-and-write must read through RWL */ | |
1082 | Extents image_extents_copy = cw_req->image_extents; | |
1083 | read(std::move(image_extents_copy), &cw_req->read_bl, cw_req->fadvise_flags, read_complete_ctx); | |
1084 | }); | |
1085 | ||
1086 | detain_guarded_request(cw_req, guarded_ctx, false); | |
1087 | } | |
1088 | ||
1089 | template <typename I> | |
1090 | void AbstractWriteLog<I>::flush(Context *on_finish) { | |
1091 | internal_flush(false, on_finish); | |
1092 | } | |
1093 | ||
1094 | template <typename I> | |
1095 | void AbstractWriteLog<I>::invalidate(Context *on_finish) { | |
1096 | internal_flush(true, on_finish); | |
1097 | } | |
1098 | ||
1099 | template <typename I> | |
1100 | CephContext *AbstractWriteLog<I>::get_context() { | |
1101 | return m_image_ctx.cct; | |
1102 | } | |
1103 | ||
1104 | template <typename I> | |
1105 | BlockGuardCell* AbstractWriteLog<I>::detain_guarded_request_helper(GuardedRequest &req) | |
1106 | { | |
1107 | CephContext *cct = m_image_ctx.cct; | |
1108 | BlockGuardCell *cell; | |
1109 | ||
1110 | ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock)); | |
1111 | ldout(cct, 20) << dendl; | |
1112 | ||
1113 | int r = m_write_log_guard.detain(req.block_extent, &req, &cell); | |
1114 | ceph_assert(r>=0); | |
1115 | if (r > 0) { | |
1116 | ldout(cct, 20) << "detaining guarded request due to in-flight requests: " | |
1117 | << "req=" << req << dendl; | |
1118 | return nullptr; | |
1119 | } | |
1120 | ||
1121 | ldout(cct, 20) << "in-flight request cell: " << cell << dendl; | |
1122 | return cell; | |
1123 | } | |
1124 | ||
1125 | template <typename I> | |
1126 | BlockGuardCell* AbstractWriteLog<I>::detain_guarded_request_barrier_helper( | |
1127 | GuardedRequest &req) | |
1128 | { | |
1129 | BlockGuardCell *cell = nullptr; | |
1130 | ||
1131 | ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock)); | |
1132 | ldout(m_image_ctx.cct, 20) << dendl; | |
1133 | ||
1134 | if (m_barrier_in_progress) { | |
1135 | req.guard_ctx->state.queued = true; | |
1136 | m_awaiting_barrier.push_back(req); | |
1137 | } else { | |
1138 | bool barrier = req.guard_ctx->state.barrier; | |
1139 | if (barrier) { | |
1140 | m_barrier_in_progress = true; | |
1141 | req.guard_ctx->state.current_barrier = true; | |
1142 | } | |
1143 | cell = detain_guarded_request_helper(req); | |
1144 | if (barrier) { | |
1145 | /* Only non-null if the barrier acquires the guard now */ | |
1146 | m_barrier_cell = cell; | |
1147 | } | |
1148 | } | |
1149 | ||
1150 | return cell; | |
1151 | } | |
1152 | ||
1153 | template <typename I> | |
1154 | void AbstractWriteLog<I>::detain_guarded_request( | |
1155 | C_BlockIORequestT *request, | |
1156 | GuardedRequestFunctionContext *guarded_ctx, | |
1157 | bool is_barrier) | |
1158 | { | |
1159 | BlockExtent extent; | |
1160 | if (request) { | |
1161 | extent = request->image_extents_summary.block_extent(); | |
1162 | } else { | |
1163 | extent = block_extent(whole_volume_extent()); | |
1164 | } | |
1165 | auto req = GuardedRequest(extent, guarded_ctx, is_barrier); | |
1166 | BlockGuardCell *cell = nullptr; | |
1167 | ||
1168 | ldout(m_image_ctx.cct, 20) << dendl; | |
1169 | { | |
1170 | std::lock_guard locker(m_blockguard_lock); | |
1171 | cell = detain_guarded_request_barrier_helper(req); | |
1172 | } | |
1173 | if (cell) { | |
1174 | req.guard_ctx->cell = cell; | |
1175 | req.guard_ctx->complete(0); | |
1176 | } | |
1177 | } | |
1178 | ||
1179 | template <typename I> | |
1180 | void AbstractWriteLog<I>::release_guarded_request(BlockGuardCell *released_cell) | |
1181 | { | |
1182 | CephContext *cct = m_image_ctx.cct; | |
1183 | WriteLogGuard::BlockOperations block_reqs; | |
1184 | ldout(cct, 20) << "released_cell=" << released_cell << dendl; | |
1185 | ||
1186 | { | |
1187 | std::lock_guard locker(m_blockguard_lock); | |
1188 | m_write_log_guard.release(released_cell, &block_reqs); | |
1189 | ||
1190 | for (auto &req : block_reqs) { | |
1191 | req.guard_ctx->state.detained = true; | |
1192 | BlockGuardCell *detained_cell = detain_guarded_request_helper(req); | |
1193 | if (detained_cell) { | |
1194 | if (req.guard_ctx->state.current_barrier) { | |
1195 | /* The current barrier is acquiring the block guard, so now we know its cell */ | |
1196 | m_barrier_cell = detained_cell; | |
1197 | /* detained_cell could be == released_cell here */ | |
1198 | ldout(cct, 20) << "current barrier cell=" << detained_cell << " req=" << req << dendl; | |
1199 | } | |
1200 | req.guard_ctx->cell = detained_cell; | |
1201 | m_work_queue.queue(req.guard_ctx); | |
1202 | } | |
1203 | } | |
1204 | ||
1205 | if (m_barrier_in_progress && (released_cell == m_barrier_cell)) { | |
1206 | ldout(cct, 20) << "current barrier released cell=" << released_cell << dendl; | |
1207 | /* The released cell is the current barrier request */ | |
1208 | m_barrier_in_progress = false; | |
1209 | m_barrier_cell = nullptr; | |
1210 | /* Move waiting requests into the blockguard. Stop if there's another barrier */ | |
1211 | while (!m_barrier_in_progress && !m_awaiting_barrier.empty()) { | |
1212 | auto &req = m_awaiting_barrier.front(); | |
1213 | ldout(cct, 20) << "submitting queued request to blockguard: " << req << dendl; | |
1214 | BlockGuardCell *detained_cell = detain_guarded_request_barrier_helper(req); | |
1215 | if (detained_cell) { | |
1216 | req.guard_ctx->cell = detained_cell; | |
1217 | m_work_queue.queue(req.guard_ctx); | |
1218 | } | |
1219 | m_awaiting_barrier.pop_front(); | |
1220 | } | |
1221 | } | |
1222 | } | |
1223 | ||
1224 | ldout(cct, 20) << "exit" << dendl; | |
1225 | } | |
1226 | ||
1227 | template <typename I> | |
1228 | void AbstractWriteLog<I>::append_scheduled(GenericLogOperations &ops, bool &ops_remain, | |
1229 | bool &appending, bool isRWL) | |
1230 | { | |
1231 | const unsigned long int OPS_APPENDED = isRWL ? MAX_ALLOC_PER_TRANSACTION | |
1232 | : MAX_WRITES_PER_SYNC_POINT; | |
1233 | { | |
1234 | std::lock_guard locker(m_lock); | |
1235 | if (!appending && m_appending) { | |
1236 | /* Another thread is appending */ | |
1237 | ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl; | |
1238 | return; | |
1239 | } | |
1240 | if (m_ops_to_append.size()) { | |
1241 | appending = true; | |
1242 | m_appending = true; | |
1243 | auto last_in_batch = m_ops_to_append.begin(); | |
1244 | unsigned int ops_to_append = m_ops_to_append.size(); | |
1245 | if (ops_to_append > OPS_APPENDED) { | |
1246 | ops_to_append = OPS_APPENDED; | |
1247 | } | |
1248 | std::advance(last_in_batch, ops_to_append); | |
1249 | ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch); | |
1250 | ops_remain = true; /* Always check again before leaving */ | |
20effc67 TL |
1251 | ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", remain " |
1252 | << m_ops_to_append.size() << dendl; | |
f67539c2 TL |
1253 | } else if (isRWL) { |
1254 | ops_remain = false; | |
1255 | if (appending) { | |
1256 | appending = false; | |
1257 | m_appending = false; | |
1258 | } | |
1259 | } | |
1260 | } | |
1261 | } | |
1262 | ||
1263 | template <typename I> | |
a4b75251 | 1264 | void AbstractWriteLog<I>::schedule_append(GenericLogOperationsVector &ops, C_BlockIORequestT *req) |
f67539c2 TL |
1265 | { |
1266 | GenericLogOperations to_append(ops.begin(), ops.end()); | |
1267 | ||
a4b75251 | 1268 | schedule_append_ops(to_append, req); |
f67539c2 TL |
1269 | } |
1270 | ||
1271 | template <typename I> | |
a4b75251 | 1272 | void AbstractWriteLog<I>::schedule_append(GenericLogOperationSharedPtr op, C_BlockIORequestT *req) |
f67539c2 TL |
1273 | { |
1274 | GenericLogOperations to_append { op }; | |
1275 | ||
a4b75251 | 1276 | schedule_append_ops(to_append, req); |
f67539c2 TL |
1277 | } |
1278 | ||
1279 | /* | |
1280 | * Complete a set of write ops with the result of append_op_entries. | |
1281 | */ | |
1282 | template <typename I> | |
1283 | void AbstractWriteLog<I>::complete_op_log_entries(GenericLogOperations &&ops, | |
1284 | const int result) | |
1285 | { | |
1286 | GenericLogEntries dirty_entries; | |
1287 | int published_reserves = 0; | |
1288 | ldout(m_image_ctx.cct, 20) << __func__ << ": completing" << dendl; | |
1289 | for (auto &op : ops) { | |
1290 | utime_t now = ceph_clock_now(); | |
1291 | auto log_entry = op->get_log_entry(); | |
1292 | log_entry->completed = true; | |
1293 | if (op->is_writing_op()) { | |
1294 | op->mark_log_entry_completed(); | |
1295 | dirty_entries.push_back(log_entry); | |
1296 | } | |
1297 | if (log_entry->is_write_entry()) { | |
1298 | release_ram(log_entry); | |
1299 | } | |
1300 | if (op->reserved_allocated()) { | |
1301 | published_reserves++; | |
1302 | } | |
1303 | { | |
1304 | std::lock_guard locker(m_lock); | |
1305 | m_unpublished_reserves -= published_reserves; | |
1306 | m_dirty_log_entries.splice(m_dirty_log_entries.end(), dirty_entries); | |
1307 | } | |
1308 | op->complete(result); | |
1309 | m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_app_t, | |
a4b75251 | 1310 | op->log_append_start_time - op->dispatch_time); |
f67539c2 TL |
1311 | m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_cmp_t, now - op->dispatch_time); |
1312 | m_perfcounter->hinc(l_librbd_pwl_log_op_dis_to_cmp_t_hist, | |
1313 | utime_t(now - op->dispatch_time).to_nsec(), | |
1314 | log_entry->ram_entry.write_bytes); | |
a4b75251 | 1315 | utime_t app_lat = op->log_append_comp_time - op->log_append_start_time; |
f67539c2 TL |
1316 | m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_appc_t, app_lat); |
1317 | m_perfcounter->hinc(l_librbd_pwl_log_op_app_to_appc_t_hist, app_lat.to_nsec(), | |
1318 | log_entry->ram_entry.write_bytes); | |
a4b75251 | 1319 | m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_cmp_t, now - op->log_append_start_time); |
f67539c2 TL |
1320 | } |
1321 | // New entries may be flushable | |
1322 | { | |
1323 | std::lock_guard locker(m_lock); | |
1324 | wake_up(); | |
1325 | } | |
1326 | } | |
1327 | ||
1328 | /** | |
1329 | * Dispatch as many deferred writes as possible | |
1330 | */ | |
1331 | template <typename I> | |
1332 | void AbstractWriteLog<I>::dispatch_deferred_writes(void) | |
1333 | { | |
1334 | C_BlockIORequestT *front_req = nullptr; /* req still on front of deferred list */ | |
1335 | C_BlockIORequestT *allocated_req = nullptr; /* req that was allocated, and is now off the list */ | |
1336 | bool allocated = false; /* front_req allocate succeeded */ | |
1337 | bool cleared_dispatching_flag = false; | |
1338 | ||
1339 | /* If we can't become the dispatcher, we'll exit */ | |
1340 | { | |
1341 | std::lock_guard locker(m_lock); | |
1342 | if (m_dispatching_deferred_ops || | |
1343 | !m_deferred_ios.size()) { | |
1344 | return; | |
1345 | } | |
1346 | m_dispatching_deferred_ops = true; | |
1347 | } | |
1348 | ||
1349 | /* There are ops to dispatch, and this should be the only thread dispatching them */ | |
1350 | { | |
1351 | std::lock_guard deferred_dispatch(m_deferred_dispatch_lock); | |
1352 | do { | |
1353 | { | |
1354 | std::lock_guard locker(m_lock); | |
1355 | ceph_assert(m_dispatching_deferred_ops); | |
1356 | if (allocated) { | |
1357 | /* On the 2..n-1 th time we get lock, front_req->alloc_resources() will | |
1358 | * have succeeded, and we'll need to pop it off the deferred ops list | |
1359 | * here. */ | |
1360 | ceph_assert(front_req); | |
1361 | ceph_assert(!allocated_req); | |
1362 | m_deferred_ios.pop_front(); | |
1363 | allocated_req = front_req; | |
1364 | front_req = nullptr; | |
1365 | allocated = false; | |
1366 | } | |
1367 | ceph_assert(!allocated); | |
1368 | if (!allocated && front_req) { | |
1369 | /* front_req->alloc_resources() failed on the last iteration. | |
1370 | * We'll stop dispatching. */ | |
1371 | wake_up(); | |
1372 | front_req = nullptr; | |
1373 | ceph_assert(!cleared_dispatching_flag); | |
1374 | m_dispatching_deferred_ops = false; | |
1375 | cleared_dispatching_flag = true; | |
1376 | } else { | |
1377 | ceph_assert(!front_req); | |
1378 | if (m_deferred_ios.size()) { | |
1379 | /* New allocation candidate */ | |
1380 | front_req = m_deferred_ios.front(); | |
1381 | } else { | |
1382 | ceph_assert(!cleared_dispatching_flag); | |
1383 | m_dispatching_deferred_ops = false; | |
1384 | cleared_dispatching_flag = true; | |
1385 | } | |
1386 | } | |
1387 | } | |
1388 | /* Try allocating for front_req before we decide what to do with allocated_req | |
1389 | * (if any) */ | |
1390 | if (front_req) { | |
1391 | ceph_assert(!cleared_dispatching_flag); | |
1392 | allocated = front_req->alloc_resources(); | |
1393 | } | |
1394 | if (allocated_req && front_req && allocated) { | |
1395 | /* Push dispatch of the first allocated req to a wq */ | |
1396 | m_work_queue.queue(new LambdaContext( | |
1397 | [this, allocated_req](int r) { | |
1398 | allocated_req->dispatch(); | |
1399 | }), 0); | |
1400 | allocated_req = nullptr; | |
1401 | } | |
1402 | ceph_assert(!(allocated_req && front_req && allocated)); | |
1403 | ||
1404 | /* Continue while we're still considering the front of the deferred ops list */ | |
1405 | } while (front_req); | |
1406 | ceph_assert(!allocated); | |
1407 | } | |
1408 | ceph_assert(cleared_dispatching_flag); | |
1409 | ||
1410 | /* If any deferred requests were allocated, the last one will still be in allocated_req */ | |
1411 | if (allocated_req) { | |
1412 | allocated_req->dispatch(); | |
1413 | } | |
1414 | } | |
1415 | ||
1416 | /** | |
1417 | * Returns the lanes used by this write, and attempts to dispatch the next | |
1418 | * deferred write | |
1419 | */ | |
1420 | template <typename I> | |
1421 | void AbstractWriteLog<I>::release_write_lanes(C_BlockIORequestT *req) | |
1422 | { | |
1423 | { | |
1424 | std::lock_guard locker(m_lock); | |
1425 | m_free_lanes += req->image_extents.size(); | |
1426 | } | |
1427 | dispatch_deferred_writes(); | |
1428 | } | |
1429 | ||
1430 | /** | |
1431 | * Attempts to allocate log resources for a write. Write is dispatched if | |
1432 | * resources are available, or queued if they aren't. | |
1433 | */ | |
1434 | template <typename I> | |
1435 | void AbstractWriteLog<I>::alloc_and_dispatch_io_req(C_BlockIORequestT *req) | |
1436 | { | |
1437 | bool dispatch_here = false; | |
1438 | ||
1439 | { | |
1440 | /* If there are already deferred writes, queue behind them for resources */ | |
1441 | { | |
1442 | std::lock_guard locker(m_lock); | |
1443 | dispatch_here = m_deferred_ios.empty(); | |
1444 | // Only flush req's total_bytes is the max uint64 | |
a4b75251 TL |
1445 | if (req->image_extents_summary.total_bytes == |
1446 | std::numeric_limits<uint64_t>::max() && | |
1447 | static_cast<C_FlushRequestT *>(req)->internal == true) { | |
f67539c2 TL |
1448 | dispatch_here = true; |
1449 | } | |
1450 | } | |
1451 | if (dispatch_here) { | |
1452 | dispatch_here = req->alloc_resources(); | |
1453 | } | |
1454 | if (dispatch_here) { | |
1455 | ldout(m_image_ctx.cct, 20) << "dispatching" << dendl; | |
1456 | req->dispatch(); | |
1457 | } else { | |
1458 | req->deferred(); | |
1459 | { | |
1460 | std::lock_guard locker(m_lock); | |
1461 | m_deferred_ios.push_back(req); | |
1462 | } | |
1463 | ldout(m_image_ctx.cct, 20) << "deferred IOs: " << m_deferred_ios.size() << dendl; | |
1464 | dispatch_deferred_writes(); | |
1465 | } | |
1466 | } | |
1467 | } | |
1468 | ||
1469 | template <typename I> | |
a4b75251 TL |
1470 | bool AbstractWriteLog<I>::check_allocation( |
1471 | C_BlockIORequestT *req, uint64_t bytes_cached, uint64_t bytes_dirtied, | |
1472 | uint64_t bytes_allocated, uint32_t num_lanes, uint32_t num_log_entries, | |
1473 | uint32_t num_unpublished_reserves) { | |
f67539c2 TL |
1474 | bool alloc_succeeds = true; |
1475 | bool no_space = false; | |
1476 | { | |
1477 | std::lock_guard locker(m_lock); | |
1478 | if (m_free_lanes < num_lanes) { | |
f67539c2 TL |
1479 | ldout(m_image_ctx.cct, 20) << "not enough free lanes (need " |
1480 | << num_lanes | |
1481 | << ", have " << m_free_lanes << ") " | |
1482 | << *req << dendl; | |
1483 | alloc_succeeds = false; | |
1484 | /* This isn't considered a "no space" alloc fail. Lanes are a throttling mechanism. */ | |
1485 | } | |
1486 | if (m_free_log_entries < num_log_entries) { | |
f67539c2 TL |
1487 | ldout(m_image_ctx.cct, 20) << "not enough free entries (need " |
1488 | << num_log_entries | |
1489 | << ", have " << m_free_log_entries << ") " | |
1490 | << *req << dendl; | |
1491 | alloc_succeeds = false; | |
1492 | no_space = true; /* Entries must be retired */ | |
1493 | } | |
1494 | /* Don't attempt buffer allocate if we've exceeded the "full" threshold */ | |
a4b75251 | 1495 | if (m_bytes_allocated + bytes_allocated > m_bytes_allocated_cap) { |
20effc67 TL |
1496 | ldout(m_image_ctx.cct, 20) << "Waiting for allocation cap (cap=" |
1497 | << m_bytes_allocated_cap | |
1498 | << ", allocated=" << m_bytes_allocated | |
1499 | << ") in write [" << *req << "]" << dendl; | |
f67539c2 TL |
1500 | alloc_succeeds = false; |
1501 | no_space = true; /* Entries must be retired */ | |
1502 | } | |
1503 | } | |
1504 | ||
1505 | if (alloc_succeeds) { | |
1506 | reserve_cache(req, alloc_succeeds, no_space); | |
1507 | } | |
1508 | ||
1509 | if (alloc_succeeds) { | |
1510 | std::lock_guard locker(m_lock); | |
1511 | /* We need one free log entry per extent (each is a separate entry), and | |
1512 | * one free "lane" for remote replication. */ | |
1513 | if ((m_free_lanes >= num_lanes) && | |
a4b75251 TL |
1514 | (m_free_log_entries >= num_log_entries) && |
1515 | (m_bytes_allocated_cap >= m_bytes_allocated + bytes_allocated)) { | |
f67539c2 TL |
1516 | m_free_lanes -= num_lanes; |
1517 | m_free_log_entries -= num_log_entries; | |
1518 | m_unpublished_reserves += num_unpublished_reserves; | |
1519 | m_bytes_allocated += bytes_allocated; | |
1520 | m_bytes_cached += bytes_cached; | |
1521 | m_bytes_dirty += bytes_dirtied; | |
f67539c2 TL |
1522 | } else { |
1523 | alloc_succeeds = false; | |
1524 | } | |
1525 | } | |
1526 | ||
1527 | if (!alloc_succeeds && no_space) { | |
1528 | /* Expedite flushing and/or retiring */ | |
1529 | std::lock_guard locker(m_lock); | |
1530 | m_alloc_failed_since_retire = true; | |
1531 | m_last_alloc_fail = ceph_clock_now(); | |
1532 | } | |
1533 | ||
1534 | return alloc_succeeds; | |
1535 | } | |
1536 | ||
1537 | template <typename I> | |
1538 | C_FlushRequest<AbstractWriteLog<I>>* AbstractWriteLog<I>::make_flush_req(Context *on_finish) { | |
1539 | utime_t flush_begins = ceph_clock_now(); | |
1540 | bufferlist bl; | |
1541 | auto *flush_req = | |
1542 | new C_FlushRequestT(*this, flush_begins, Extents({whole_volume_extent()}), | |
1543 | std::move(bl), 0, m_lock, m_perfcounter, on_finish); | |
1544 | ||
1545 | return flush_req; | |
1546 | } | |
1547 | ||
1548 | template <typename I> | |
1549 | void AbstractWriteLog<I>::wake_up() { | |
1550 | CephContext *cct = m_image_ctx.cct; | |
1551 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1552 | ||
1553 | if (!m_wake_up_enabled) { | |
1554 | // wake_up is disabled during shutdown after flushing completes | |
1555 | ldout(m_image_ctx.cct, 6) << "deferred processing disabled" << dendl; | |
1556 | return; | |
1557 | } | |
1558 | ||
1559 | if (m_wake_up_requested && m_wake_up_scheduled) { | |
1560 | return; | |
1561 | } | |
1562 | ||
1563 | ldout(cct, 20) << dendl; | |
1564 | ||
1565 | /* Wake-up can be requested while it's already scheduled */ | |
1566 | m_wake_up_requested = true; | |
1567 | ||
1568 | /* Wake-up cannot be scheduled if it's already scheduled */ | |
1569 | if (m_wake_up_scheduled) { | |
1570 | return; | |
1571 | } | |
1572 | m_wake_up_scheduled = true; | |
1573 | m_async_process_work++; | |
1574 | m_async_op_tracker.start_op(); | |
1575 | m_work_queue.queue(new LambdaContext( | |
1576 | [this](int r) { | |
1577 | process_work(); | |
1578 | m_async_op_tracker.finish_op(); | |
1579 | m_async_process_work--; | |
1580 | }), 0); | |
1581 | } | |
1582 | ||
1583 | template <typename I> | |
1584 | bool AbstractWriteLog<I>::can_flush_entry(std::shared_ptr<GenericLogEntry> log_entry) { | |
1585 | CephContext *cct = m_image_ctx.cct; | |
1586 | ||
1587 | ldout(cct, 20) << "" << dendl; | |
1588 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1589 | ||
1590 | if (m_invalidating) { | |
1591 | return true; | |
1592 | } | |
1593 | ||
1594 | /* For OWB we can flush entries with the same sync gen number (write between | |
1595 | * aio_flush() calls) concurrently. Here we'll consider an entry flushable if | |
1596 | * its sync gen number is <= the lowest sync gen number carried by all the | |
1597 | * entries currently flushing. | |
1598 | * | |
1599 | * If the entry considered here bears a sync gen number lower than a | |
1600 | * previously flushed entry, the application had to have submitted the write | |
1601 | * bearing the higher gen number before the write with the lower gen number | |
1602 | * completed. So, flushing these concurrently is OK. | |
1603 | * | |
1604 | * If the entry considered here bears a sync gen number higher than a | |
1605 | * currently flushing entry, the write with the lower gen number may have | |
1606 | * completed to the application before the write with the higher sync gen | |
1607 | * number was submitted, and the application may rely on that completion | |
1608 | * order for volume consistency. In this case the entry will not be | |
1609 | * considered flushable until all the entries bearing lower sync gen numbers | |
1610 | * finish flushing. | |
1611 | */ | |
1612 | ||
1613 | if (m_flush_ops_in_flight && | |
1614 | (log_entry->ram_entry.sync_gen_number > m_lowest_flushing_sync_gen)) { | |
1615 | return false; | |
1616 | } | |
1617 | ||
1618 | return (log_entry->can_writeback() && | |
1619 | (m_flush_ops_in_flight <= IN_FLIGHT_FLUSH_WRITE_LIMIT) && | |
1620 | (m_flush_bytes_in_flight <= IN_FLIGHT_FLUSH_BYTES_LIMIT)); | |
1621 | } | |
1622 | ||
1623 | template <typename I> | |
20effc67 TL |
1624 | void AbstractWriteLog<I>::detain_flush_guard_request(std::shared_ptr<GenericLogEntry> log_entry, |
1625 | GuardedRequestFunctionContext *guarded_ctx) { | |
1626 | ldout(m_image_ctx.cct, 20) << dendl; | |
f67539c2 | 1627 | |
20effc67 TL |
1628 | BlockExtent extent; |
1629 | if (log_entry->is_sync_point()) { | |
1630 | extent = block_extent(whole_volume_extent()); | |
1631 | } else { | |
1632 | extent = log_entry->ram_entry.block_extent(); | |
f67539c2 | 1633 | } |
20effc67 TL |
1634 | |
1635 | auto req = GuardedRequest(extent, guarded_ctx, false); | |
1636 | BlockGuardCell *cell = nullptr; | |
1637 | ||
1638 | { | |
1639 | std::lock_guard locker(m_flush_guard_lock); | |
1640 | m_flush_guard.detain(req.block_extent, &req, &cell); | |
1641 | } | |
1642 | if (cell) { | |
1643 | req.guard_ctx->cell = cell; | |
1644 | m_image_ctx.op_work_queue->queue(req.guard_ctx, 0); | |
1645 | } | |
1646 | } | |
1647 | ||
1648 | template <typename I> | |
1649 | Context* AbstractWriteLog<I>::construct_flush_entry(std::shared_ptr<GenericLogEntry> log_entry, | |
1650 | bool invalidating) { | |
1651 | ldout(m_image_ctx.cct, 20) << "" << dendl; | |
f67539c2 TL |
1652 | |
1653 | /* Flush write completion action */ | |
a4b75251 | 1654 | utime_t writeback_start_time = ceph_clock_now(); |
f67539c2 | 1655 | Context *ctx = new LambdaContext( |
a4b75251 TL |
1656 | [this, log_entry, writeback_start_time, invalidating](int r) { |
1657 | utime_t writeback_comp_time = ceph_clock_now(); | |
1658 | m_perfcounter->tinc(l_librbd_pwl_writeback_latency, | |
1659 | writeback_comp_time - writeback_start_time); | |
f67539c2 TL |
1660 | { |
1661 | std::lock_guard locker(m_lock); | |
1662 | if (r < 0) { | |
1663 | lderr(m_image_ctx.cct) << "failed to flush log entry" | |
1664 | << cpp_strerror(r) << dendl; | |
1665 | m_dirty_log_entries.push_front(log_entry); | |
1666 | } else { | |
1667 | ceph_assert(m_bytes_dirty >= log_entry->bytes_dirty()); | |
1668 | log_entry->set_flushed(true); | |
1669 | m_bytes_dirty -= log_entry->bytes_dirty(); | |
1670 | sync_point_writer_flushed(log_entry->get_sync_point_entry()); | |
1671 | ldout(m_image_ctx.cct, 20) << "flushed: " << log_entry | |
1672 | << " invalidating=" << invalidating | |
1673 | << dendl; | |
1674 | } | |
1675 | m_flush_ops_in_flight -= 1; | |
1676 | m_flush_bytes_in_flight -= log_entry->ram_entry.write_bytes; | |
1677 | wake_up(); | |
1678 | } | |
1679 | }); | |
1680 | /* Flush through lower cache before completing */ | |
1681 | ctx = new LambdaContext( | |
20effc67 TL |
1682 | [this, ctx, log_entry](int r) { |
1683 | { | |
1684 | ||
1685 | WriteLogGuard::BlockOperations block_reqs; | |
1686 | BlockGuardCell *detained_cell = nullptr; | |
1687 | ||
1688 | std::lock_guard locker{m_flush_guard_lock}; | |
1689 | m_flush_guard.release(log_entry->m_cell, &block_reqs); | |
1690 | ||
1691 | for (auto &req : block_reqs) { | |
1692 | m_flush_guard.detain(req.block_extent, &req, &detained_cell); | |
1693 | if (detained_cell) { | |
1694 | req.guard_ctx->cell = detained_cell; | |
1695 | m_image_ctx.op_work_queue->queue(req.guard_ctx, 0); | |
1696 | } | |
1697 | } | |
1698 | } | |
1699 | ||
f67539c2 TL |
1700 | if (r < 0) { |
1701 | lderr(m_image_ctx.cct) << "failed to flush log entry" | |
1702 | << cpp_strerror(r) << dendl; | |
1703 | ctx->complete(r); | |
1704 | } else { | |
1705 | m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, ctx); | |
1706 | } | |
1707 | }); | |
1708 | return ctx; | |
1709 | } | |
1710 | ||
1711 | template <typename I> | |
1712 | void AbstractWriteLog<I>::process_writeback_dirty_entries() { | |
1713 | CephContext *cct = m_image_ctx.cct; | |
1714 | bool all_clean = false; | |
1715 | int flushed = 0; | |
a4b75251 | 1716 | bool has_write_entry = false; |
f67539c2 TL |
1717 | |
1718 | ldout(cct, 20) << "Look for dirty entries" << dendl; | |
1719 | { | |
1720 | DeferredContexts post_unlock; | |
a4b75251 TL |
1721 | GenericLogEntries entries_to_flush; |
1722 | ||
f67539c2 | 1723 | std::shared_lock entry_reader_locker(m_entry_reader_lock); |
a4b75251 | 1724 | std::lock_guard locker(m_lock); |
f67539c2 | 1725 | while (flushed < IN_FLIGHT_FLUSH_WRITE_LIMIT) { |
f67539c2 TL |
1726 | if (m_shutting_down) { |
1727 | ldout(cct, 5) << "Flush during shutdown supressed" << dendl; | |
1728 | /* Do flush complete only when all flush ops are finished */ | |
1729 | all_clean = !m_flush_ops_in_flight; | |
1730 | break; | |
1731 | } | |
1732 | if (m_dirty_log_entries.empty()) { | |
1733 | ldout(cct, 20) << "Nothing new to flush" << dendl; | |
1734 | /* Do flush complete only when all flush ops are finished */ | |
1735 | all_clean = !m_flush_ops_in_flight; | |
1736 | break; | |
1737 | } | |
a4b75251 | 1738 | |
f67539c2 TL |
1739 | auto candidate = m_dirty_log_entries.front(); |
1740 | bool flushable = can_flush_entry(candidate); | |
1741 | if (flushable) { | |
a4b75251 | 1742 | entries_to_flush.push_back(candidate); |
f67539c2 | 1743 | flushed++; |
a4b75251 TL |
1744 | if (!has_write_entry) |
1745 | has_write_entry = candidate->is_write_entry(); | |
f67539c2 | 1746 | m_dirty_log_entries.pop_front(); |
20effc67 TL |
1747 | |
1748 | // To track candidate, we should add m_flush_ops_in_flight in here | |
1749 | { | |
1750 | if (!m_flush_ops_in_flight || | |
1751 | (candidate->ram_entry.sync_gen_number < m_lowest_flushing_sync_gen)) { | |
1752 | m_lowest_flushing_sync_gen = candidate->ram_entry.sync_gen_number; | |
1753 | } | |
1754 | m_flush_ops_in_flight += 1; | |
1755 | /* For write same this is the bytes affected by the flush op, not the bytes transferred */ | |
1756 | m_flush_bytes_in_flight += candidate->ram_entry.write_bytes; | |
1757 | } | |
f67539c2 TL |
1758 | } else { |
1759 | ldout(cct, 20) << "Next dirty entry isn't flushable yet" << dendl; | |
1760 | break; | |
1761 | } | |
1762 | } | |
a4b75251 TL |
1763 | |
1764 | construct_flush_entries(entries_to_flush, post_unlock, has_write_entry); | |
f67539c2 TL |
1765 | } |
1766 | ||
1767 | if (all_clean) { | |
1768 | /* All flushing complete, drain outside lock */ | |
1769 | Contexts flush_contexts; | |
1770 | { | |
1771 | std::lock_guard locker(m_lock); | |
1772 | flush_contexts.swap(m_flush_complete_contexts); | |
1773 | } | |
1774 | finish_contexts(m_image_ctx.cct, flush_contexts, 0); | |
1775 | } | |
1776 | } | |
1777 | ||
1778 | /* Returns true if the specified SyncPointLogEntry is considered flushed, and | |
1779 | * the log will be updated to reflect this. */ | |
1780 | template <typename I> | |
1781 | bool AbstractWriteLog<I>::handle_flushed_sync_point(std::shared_ptr<SyncPointLogEntry> log_entry) | |
1782 | { | |
1783 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1784 | ceph_assert(log_entry); | |
1785 | ||
1786 | if ((log_entry->writes_flushed == log_entry->writes) && | |
1787 | log_entry->completed && log_entry->prior_sync_point_flushed && | |
1788 | log_entry->next_sync_point_entry) { | |
1789 | ldout(m_image_ctx.cct, 20) << "All writes flushed up to sync point=" | |
1790 | << *log_entry << dendl; | |
1791 | log_entry->next_sync_point_entry->prior_sync_point_flushed = true; | |
1792 | /* Don't move the flushed sync gen num backwards. */ | |
1793 | if (m_flushed_sync_gen < log_entry->ram_entry.sync_gen_number) { | |
1794 | m_flushed_sync_gen = log_entry->ram_entry.sync_gen_number; | |
1795 | } | |
1796 | m_async_op_tracker.start_op(); | |
1797 | m_work_queue.queue(new LambdaContext( | |
a4b75251 | 1798 | [this, next = std::move(log_entry->next_sync_point_entry)](int r) { |
f67539c2 TL |
1799 | bool handled_by_next; |
1800 | { | |
1801 | std::lock_guard locker(m_lock); | |
a4b75251 | 1802 | handled_by_next = handle_flushed_sync_point(std::move(next)); |
f67539c2 TL |
1803 | } |
1804 | if (!handled_by_next) { | |
1805 | persist_last_flushed_sync_gen(); | |
1806 | } | |
1807 | m_async_op_tracker.finish_op(); | |
1808 | })); | |
1809 | return true; | |
1810 | } | |
1811 | return false; | |
1812 | } | |
1813 | ||
1814 | template <typename I> | |
1815 | void AbstractWriteLog<I>::sync_point_writer_flushed(std::shared_ptr<SyncPointLogEntry> log_entry) | |
1816 | { | |
1817 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1818 | ceph_assert(log_entry); | |
1819 | log_entry->writes_flushed++; | |
1820 | ||
1821 | /* If this entry might be completely flushed, look closer */ | |
1822 | if ((log_entry->writes_flushed == log_entry->writes) && log_entry->completed) { | |
1823 | ldout(m_image_ctx.cct, 15) << "All writes flushed for sync point=" | |
1824 | << *log_entry << dendl; | |
1825 | handle_flushed_sync_point(log_entry); | |
1826 | } | |
1827 | } | |
1828 | ||
1829 | /* Make a new sync point and flush the previous during initialization, when there may or may | |
1830 | * not be a previous sync point */ | |
1831 | template <typename I> | |
1832 | void AbstractWriteLog<I>::init_flush_new_sync_point(DeferredContexts &later) { | |
1833 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1834 | ceph_assert(!m_initialized); /* Don't use this after init */ | |
1835 | ||
1836 | if (!m_current_sync_point) { | |
1837 | /* First sync point since start */ | |
1838 | new_sync_point(later); | |
1839 | } else { | |
1840 | flush_new_sync_point(nullptr, later); | |
1841 | } | |
1842 | } | |
1843 | ||
1844 | /** | |
1845 | * Begin a new sync point | |
1846 | */ | |
1847 | template <typename I> | |
1848 | void AbstractWriteLog<I>::new_sync_point(DeferredContexts &later) { | |
1849 | CephContext *cct = m_image_ctx.cct; | |
1850 | std::shared_ptr<SyncPoint> old_sync_point = m_current_sync_point; | |
1851 | std::shared_ptr<SyncPoint> new_sync_point; | |
1852 | ldout(cct, 20) << dendl; | |
1853 | ||
1854 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1855 | ||
1856 | /* The first time this is called, if this is a newly created log, | |
1857 | * this makes the first sync gen number we'll use 1. On the first | |
1858 | * call for a re-opened log m_current_sync_gen will be the highest | |
1859 | * gen number from all the sync point entries found in the re-opened | |
1860 | * log, and this advances to the next sync gen number. */ | |
1861 | ++m_current_sync_gen; | |
1862 | ||
1863 | new_sync_point = std::make_shared<SyncPoint>(m_current_sync_gen, cct); | |
1864 | m_current_sync_point = new_sync_point; | |
1865 | ||
1866 | /* If this log has been re-opened, old_sync_point will initially be | |
1867 | * nullptr, but m_current_sync_gen may not be zero. */ | |
1868 | if (old_sync_point) { | |
1869 | new_sync_point->setup_earlier_sync_point(old_sync_point, m_last_op_sequence_num); | |
1870 | m_perfcounter->hinc(l_librbd_pwl_syncpoint_hist, | |
1871 | old_sync_point->log_entry->writes, | |
1872 | old_sync_point->log_entry->bytes); | |
1873 | /* This sync point will acquire no more sub-ops. Activation needs | |
1874 | * to acquire m_lock, so defer to later*/ | |
1875 | later.add(new LambdaContext( | |
1876 | [this, old_sync_point](int r) { | |
1877 | old_sync_point->prior_persisted_gather_activate(); | |
1878 | })); | |
1879 | } | |
1880 | ||
1881 | new_sync_point->prior_persisted_gather_set_finisher(); | |
1882 | ||
1883 | if (old_sync_point) { | |
1884 | ldout(cct,6) << "new sync point = [" << *m_current_sync_point | |
1885 | << "], prior = [" << *old_sync_point << "]" << dendl; | |
1886 | } else { | |
1887 | ldout(cct,6) << "first sync point = [" << *m_current_sync_point | |
1888 | << "]" << dendl; | |
1889 | } | |
1890 | } | |
1891 | ||
1892 | template <typename I> | |
1893 | void AbstractWriteLog<I>::flush_new_sync_point(C_FlushRequestT *flush_req, | |
1894 | DeferredContexts &later) { | |
1895 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1896 | ||
1897 | if (!flush_req) { | |
1898 | m_async_null_flush_finish++; | |
1899 | m_async_op_tracker.start_op(); | |
1900 | Context *flush_ctx = new LambdaContext([this](int r) { | |
1901 | m_async_null_flush_finish--; | |
1902 | m_async_op_tracker.finish_op(); | |
1903 | }); | |
1904 | flush_req = make_flush_req(flush_ctx); | |
1905 | flush_req->internal = true; | |
1906 | } | |
1907 | ||
1908 | /* Add a new sync point. */ | |
1909 | new_sync_point(later); | |
1910 | std::shared_ptr<SyncPoint> to_append = m_current_sync_point->earlier_sync_point; | |
1911 | ceph_assert(to_append); | |
1912 | ||
1913 | /* This flush request will append/persist the (now) previous sync point */ | |
1914 | flush_req->to_append = to_append; | |
1915 | ||
1916 | /* When the m_sync_point_persist Gather completes this sync point can be | |
1917 | * appended. The only sub for this Gather is the finisher Context for | |
1918 | * m_prior_log_entries_persisted, which records the result of the Gather in | |
1919 | * the sync point, and completes. TODO: Do we still need both of these | |
1920 | * Gathers?*/ | |
1921 | Context * ctx = new LambdaContext([this, flush_req](int r) { | |
1922 | ldout(m_image_ctx.cct, 20) << "Flush req=" << flush_req | |
1923 | << " sync point =" << flush_req->to_append | |
1924 | << ". Ready to persist." << dendl; | |
1925 | alloc_and_dispatch_io_req(flush_req); | |
1926 | }); | |
1927 | to_append->persist_gather_set_finisher(ctx); | |
1928 | ||
1929 | /* The m_sync_point_persist Gather has all the subs it will ever have, and | |
1930 | * now has its finisher. If the sub is already complete, activation will | |
1931 | * complete the Gather. The finisher will acquire m_lock, so we'll activate | |
1932 | * this when we release m_lock.*/ | |
1933 | later.add(new LambdaContext([this, to_append](int r) { | |
1934 | to_append->persist_gather_activate(); | |
1935 | })); | |
1936 | ||
1937 | /* The flush request completes when the sync point persists */ | |
1938 | to_append->add_in_on_persisted_ctxs(flush_req); | |
1939 | } | |
1940 | ||
1941 | template <typename I> | |
1942 | void AbstractWriteLog<I>::flush_new_sync_point_if_needed(C_FlushRequestT *flush_req, | |
1943 | DeferredContexts &later) { | |
1944 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1945 | ||
1946 | /* If there have been writes since the last sync point ... */ | |
1947 | if (m_current_sync_point->log_entry->writes) { | |
1948 | flush_new_sync_point(flush_req, later); | |
1949 | } else { | |
1950 | /* There have been no writes to the current sync point. */ | |
1951 | if (m_current_sync_point->earlier_sync_point) { | |
1952 | /* If previous sync point hasn't completed, complete this flush | |
1953 | * with the earlier sync point. No alloc or dispatch needed. */ | |
1954 | m_current_sync_point->earlier_sync_point->on_sync_point_persisted.push_back(flush_req); | |
1955 | } else { | |
1956 | /* The previous sync point has already completed and been | |
1957 | * appended. The current sync point has no writes, so this flush | |
1958 | * has nothing to wait for. This flush completes now. */ | |
1959 | later.add(flush_req); | |
1960 | } | |
1961 | } | |
1962 | } | |
1963 | ||
1964 | /* | |
1965 | * RWL internal flush - will actually flush the RWL. | |
1966 | * | |
1967 | * User flushes should arrive at aio_flush(), and only flush prior | |
1968 | * writes to all log replicas. | |
1969 | * | |
1970 | * Librbd internal flushes will arrive at flush(invalidate=false, | |
1971 | * discard=false), and traverse the block guard to ensure in-flight writes are | |
1972 | * flushed. | |
1973 | */ | |
1974 | template <typename I> | |
1975 | void AbstractWriteLog<I>::flush_dirty_entries(Context *on_finish) { | |
1976 | CephContext *cct = m_image_ctx.cct; | |
1977 | bool all_clean; | |
1978 | bool flushing; | |
1979 | bool stop_flushing; | |
1980 | ||
1981 | { | |
1982 | std::lock_guard locker(m_lock); | |
1983 | flushing = (0 != m_flush_ops_in_flight); | |
1984 | all_clean = m_dirty_log_entries.empty(); | |
1985 | stop_flushing = (m_shutting_down); | |
1986 | } | |
1987 | ||
1988 | if (!flushing && (all_clean || stop_flushing)) { | |
1989 | /* Complete without holding m_lock */ | |
1990 | if (all_clean) { | |
1991 | ldout(cct, 20) << "no dirty entries" << dendl; | |
1992 | } else { | |
1993 | ldout(cct, 5) << "flush during shutdown suppressed" << dendl; | |
1994 | } | |
1995 | on_finish->complete(0); | |
1996 | } else { | |
1997 | if (all_clean) { | |
1998 | ldout(cct, 5) << "flush ops still in progress" << dendl; | |
1999 | } else { | |
2000 | ldout(cct, 20) << "dirty entries remain" << dendl; | |
2001 | } | |
2002 | std::lock_guard locker(m_lock); | |
2003 | /* on_finish can't be completed yet */ | |
2004 | m_flush_complete_contexts.push_back(new LambdaContext( | |
2005 | [this, on_finish](int r) { | |
2006 | flush_dirty_entries(on_finish); | |
2007 | })); | |
2008 | wake_up(); | |
2009 | } | |
2010 | } | |
2011 | ||
2012 | template <typename I> | |
2013 | void AbstractWriteLog<I>::internal_flush(bool invalidate, Context *on_finish) { | |
2014 | ldout(m_image_ctx.cct, 20) << "invalidate=" << invalidate << dendl; | |
2015 | ||
2016 | if (m_perfcounter) { | |
2017 | if (invalidate) { | |
2018 | m_perfcounter->inc(l_librbd_pwl_invalidate_cache, 1); | |
2019 | } else { | |
a4b75251 | 2020 | m_perfcounter->inc(l_librbd_pwl_internal_flush, 1); |
f67539c2 TL |
2021 | } |
2022 | } | |
2023 | ||
2024 | /* May be called even if initialization fails */ | |
2025 | if (!m_initialized) { | |
2026 | ldout(m_image_ctx.cct, 05) << "never initialized" << dendl; | |
2027 | /* Deadlock if completed here */ | |
2028 | m_image_ctx.op_work_queue->queue(on_finish, 0); | |
2029 | return; | |
2030 | } | |
2031 | ||
2032 | /* Flush/invalidate must pass through block guard to ensure all layers of | |
2033 | * cache are consistently flush/invalidated. This ensures no in-flight write leaves | |
2034 | * some layers with valid regions, which may later produce inconsistent read | |
2035 | * results. */ | |
2036 | GuardedRequestFunctionContext *guarded_ctx = | |
2037 | new GuardedRequestFunctionContext( | |
2038 | [this, on_finish, invalidate](GuardedRequestFunctionContext &guard_ctx) { | |
2039 | DeferredContexts on_exit; | |
2040 | ldout(m_image_ctx.cct, 20) << "cell=" << guard_ctx.cell << dendl; | |
2041 | ceph_assert(guard_ctx.cell); | |
2042 | ||
2043 | Context *ctx = new LambdaContext( | |
2044 | [this, cell=guard_ctx.cell, invalidate, on_finish](int r) { | |
2045 | std::lock_guard locker(m_lock); | |
2046 | m_invalidating = false; | |
2047 | ldout(m_image_ctx.cct, 6) << "Done flush/invalidating (invalidate=" | |
2048 | << invalidate << ")" << dendl; | |
2049 | if (m_log_entries.size()) { | |
2050 | ldout(m_image_ctx.cct, 1) << "m_log_entries.size()=" | |
20effc67 TL |
2051 | << m_log_entries.size() |
2052 | << ", front()=" << *m_log_entries.front() | |
f67539c2 TL |
2053 | << dendl; |
2054 | } | |
2055 | if (invalidate) { | |
2056 | ceph_assert(m_log_entries.size() == 0); | |
2057 | } | |
2058 | ceph_assert(m_dirty_log_entries.size() == 0); | |
2059 | m_image_ctx.op_work_queue->queue(on_finish, r); | |
2060 | release_guarded_request(cell); | |
2061 | }); | |
2062 | ctx = new LambdaContext( | |
2063 | [this, ctx, invalidate](int r) { | |
2064 | Context *next_ctx = ctx; | |
2065 | ldout(m_image_ctx.cct, 6) << "flush_dirty_entries finished" << dendl; | |
2066 | if (r < 0) { | |
2067 | /* Override on_finish status with this error */ | |
2068 | next_ctx = new LambdaContext([r, ctx](int _r) { | |
2069 | ctx->complete(r); | |
2070 | }); | |
2071 | } | |
2072 | if (invalidate) { | |
2073 | { | |
2074 | std::lock_guard locker(m_lock); | |
2075 | ceph_assert(m_dirty_log_entries.size() == 0); | |
2076 | ceph_assert(!m_invalidating); | |
2077 | ldout(m_image_ctx.cct, 6) << "Invalidating" << dendl; | |
2078 | m_invalidating = true; | |
2079 | } | |
2080 | /* Discards all RWL entries */ | |
2081 | while (retire_entries(MAX_ALLOC_PER_TRANSACTION)) { } | |
2082 | next_ctx->complete(0); | |
2083 | } else { | |
2084 | { | |
2085 | std::lock_guard locker(m_lock); | |
2086 | ceph_assert(m_dirty_log_entries.size() == 0); | |
2087 | ceph_assert(!m_invalidating); | |
2088 | } | |
2089 | m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, next_ctx); | |
2090 | } | |
2091 | }); | |
2092 | ctx = new LambdaContext( | |
2093 | [this, ctx](int r) { | |
2094 | flush_dirty_entries(ctx); | |
2095 | }); | |
2096 | std::lock_guard locker(m_lock); | |
2097 | /* Even if we're throwing everything away, but we want the last entry to | |
2098 | * be a sync point so we can cleanly resume. | |
2099 | * | |
2100 | * Also, the blockguard only guarantees the replication of this op | |
2101 | * can't overlap with prior ops. It doesn't guarantee those are all | |
2102 | * completed and eligible for flush & retire, which we require here. | |
2103 | */ | |
2104 | auto flush_req = make_flush_req(ctx); | |
2105 | flush_new_sync_point_if_needed(flush_req, on_exit); | |
2106 | }); | |
2107 | detain_guarded_request(nullptr, guarded_ctx, true); | |
2108 | } | |
2109 | ||
2110 | template <typename I> | |
2111 | void AbstractWriteLog<I>::add_into_log_map(GenericWriteLogEntries &log_entries, | |
2112 | C_BlockIORequestT *req) { | |
2113 | req->copy_cache(); | |
2114 | m_blocks_to_log_entries.add_log_entries(log_entries); | |
2115 | } | |
2116 | ||
2117 | template <typename I> | |
2118 | bool AbstractWriteLog<I>::can_retire_entry(std::shared_ptr<GenericLogEntry> log_entry) { | |
2119 | CephContext *cct = m_image_ctx.cct; | |
2120 | ||
2121 | ldout(cct, 20) << dendl; | |
2122 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
20effc67 | 2123 | ceph_assert(log_entry); |
f67539c2 TL |
2124 | return log_entry->can_retire(); |
2125 | } | |
2126 | ||
a4b75251 TL |
2127 | template <typename I> |
2128 | void AbstractWriteLog<I>::check_image_cache_state_clean() { | |
2129 | ceph_assert(m_deferred_ios.empty()); | |
20effc67 | 2130 | ceph_assert(m_ops_to_append.empty()); |
a4b75251 TL |
2131 | ceph_assert(m_async_flush_ops == 0); |
2132 | ceph_assert(m_async_append_ops == 0); | |
2133 | ceph_assert(m_dirty_log_entries.empty()); | |
2134 | ceph_assert(m_ops_to_flush.empty()); | |
2135 | ceph_assert(m_flush_ops_in_flight == 0); | |
2136 | ceph_assert(m_flush_bytes_in_flight == 0); | |
2137 | ceph_assert(m_bytes_dirty == 0); | |
2138 | ceph_assert(m_work_queue.empty()); | |
2139 | } | |
2140 | ||
f67539c2 TL |
2141 | } // namespace pwl |
2142 | } // namespace cache | |
2143 | } // namespace librbd | |
2144 | ||
2145 | template class librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx>; |