]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "AbstractWriteLog.h" | |
5 | #include "include/buffer.h" | |
6 | #include "include/Context.h" | |
7 | #include "include/ceph_assert.h" | |
8 | #include "common/deleter.h" | |
9 | #include "common/dout.h" | |
10 | #include "common/environment.h" | |
11 | #include "common/errno.h" | |
12 | #include "common/hostname.h" | |
13 | #include "common/WorkQueue.h" | |
14 | #include "common/Timer.h" | |
15 | #include "common/perf_counters.h" | |
16 | #include "librbd/ImageCtx.h" | |
17 | #include "librbd/asio/ContextWQ.h" | |
18 | #include "librbd/cache/pwl/ImageCacheState.h" | |
19 | #include "librbd/cache/pwl/LogEntry.h" | |
20 | #include "librbd/plugin/Api.h" | |
21 | #include <map> | |
22 | #include <vector> | |
23 | ||
24 | #undef dout_subsys | |
25 | #define dout_subsys ceph_subsys_rbd_pwl | |
26 | #undef dout_prefix | |
27 | #define dout_prefix *_dout << "librbd::cache::pwl::AbstractWriteLog: " << this \ | |
28 | << " " << __func__ << ": " | |
29 | ||
30 | namespace librbd { | |
31 | namespace cache { | |
32 | namespace pwl { | |
33 | ||
20effc67 | 34 | using namespace std; |
f67539c2 TL |
35 | using namespace librbd::cache::pwl; |
36 | ||
37 | typedef AbstractWriteLog<ImageCtx>::Extent Extent; | |
38 | typedef AbstractWriteLog<ImageCtx>::Extents Extents; | |
39 | ||
40 | template <typename I> | |
41 | AbstractWriteLog<I>::AbstractWriteLog( | |
42 | I &image_ctx, librbd::cache::pwl::ImageCacheState<I>* cache_state, | |
43 | Builder<This> *builder, cache::ImageWritebackInterface& image_writeback, | |
44 | plugin::Api<I>& plugin_api) | |
45 | : m_builder(builder), | |
46 | m_write_log_guard(image_ctx.cct), | |
20effc67 TL |
47 | m_flush_guard(image_ctx.cct), |
48 | m_flush_guard_lock(ceph::make_mutex(pwl::unique_lock_name( | |
49 | "librbd::cache::pwl::AbstractWriteLog::m_flush_guard_lock", this))), | |
f67539c2 TL |
50 | m_deferred_dispatch_lock(ceph::make_mutex(pwl::unique_lock_name( |
51 | "librbd::cache::pwl::AbstractWriteLog::m_deferred_dispatch_lock", this))), | |
52 | m_blockguard_lock(ceph::make_mutex(pwl::unique_lock_name( | |
53 | "librbd::cache::pwl::AbstractWriteLog::m_blockguard_lock", this))), | |
54 | m_thread_pool( | |
55 | image_ctx.cct, "librbd::cache::pwl::AbstractWriteLog::thread_pool", | |
56 | "tp_pwl", 4, ""), | |
57 | m_cache_state(cache_state), | |
58 | m_image_ctx(image_ctx), | |
a4b75251 | 59 | m_log_pool_size(DEFAULT_POOL_SIZE), |
f67539c2 TL |
60 | m_image_writeback(image_writeback), |
61 | m_plugin_api(plugin_api), | |
62 | m_log_retire_lock(ceph::make_mutex(pwl::unique_lock_name( | |
63 | "librbd::cache::pwl::AbstractWriteLog::m_log_retire_lock", this))), | |
64 | m_entry_reader_lock("librbd::cache::pwl::AbstractWriteLog::m_entry_reader_lock"), | |
65 | m_log_append_lock(ceph::make_mutex(pwl::unique_lock_name( | |
66 | "librbd::cache::pwl::AbstractWriteLog::m_log_append_lock", this))), | |
67 | m_lock(ceph::make_mutex(pwl::unique_lock_name( | |
68 | "librbd::cache::pwl::AbstractWriteLog::m_lock", this))), | |
69 | m_blocks_to_log_entries(image_ctx.cct), | |
70 | m_work_queue("librbd::cache::pwl::ReplicatedWriteLog::work_queue", | |
71 | ceph::make_timespan( | |
72 | image_ctx.config.template get_val<uint64_t>( | |
73 | "rbd_op_thread_timeout")), | |
74 | &m_thread_pool) | |
75 | { | |
76 | CephContext *cct = m_image_ctx.cct; | |
77 | m_plugin_api.get_image_timer_instance(cct, &m_timer, &m_timer_lock); | |
78 | } | |
79 | ||
80 | template <typename I> | |
81 | AbstractWriteLog<I>::~AbstractWriteLog() { | |
82 | ldout(m_image_ctx.cct, 15) << "enter" << dendl; | |
83 | { | |
84 | std::lock_guard timer_locker(*m_timer_lock); | |
85 | std::lock_guard locker(m_lock); | |
86 | m_timer->cancel_event(m_timer_ctx); | |
87 | m_thread_pool.stop(); | |
88 | ceph_assert(m_deferred_ios.size() == 0); | |
89 | ceph_assert(m_ops_to_flush.size() == 0); | |
90 | ceph_assert(m_ops_to_append.size() == 0); | |
91 | ceph_assert(m_flush_ops_in_flight == 0); | |
92 | ||
93 | delete m_cache_state; | |
94 | m_cache_state = nullptr; | |
95 | } | |
96 | ldout(m_image_ctx.cct, 15) << "exit" << dendl; | |
97 | } | |
98 | ||
99 | template <typename I> | |
100 | void AbstractWriteLog<I>::perf_start(std::string name) { | |
101 | PerfCountersBuilder plb(m_image_ctx.cct, name, l_librbd_pwl_first, | |
102 | l_librbd_pwl_last); | |
103 | ||
104 | // Latency axis configuration for op histograms, values are in nanoseconds | |
105 | PerfHistogramCommon::axis_config_d op_hist_x_axis_config{ | |
106 | "Latency (nsec)", | |
107 | PerfHistogramCommon::SCALE_LOG2, ///< Latency in logarithmic scale | |
108 | 0, ///< Start at 0 | |
109 | 5000, ///< Quantization unit is 5usec | |
110 | 16, ///< Ranges into the mS | |
111 | }; | |
112 | ||
113 | // Syncpoint logentry number x-axis configuration for op histograms | |
114 | PerfHistogramCommon::axis_config_d sp_logentry_number_config{ | |
115 | "logentry number", | |
116 | PerfHistogramCommon::SCALE_LINEAR, // log entry number in linear scale | |
117 | 0, // Start at 0 | |
118 | 1, // Quantization unit is 1 | |
119 | 260, // Up to 260 > (MAX_WRITES_PER_SYNC_POINT) | |
120 | }; | |
121 | ||
122 | // Syncpoint bytes number y-axis configuration for op histogram | |
123 | PerfHistogramCommon::axis_config_d sp_bytes_number_config{ | |
124 | "Number of SyncPoint", | |
125 | PerfHistogramCommon::SCALE_LOG2, // Request size in logarithmic scale | |
126 | 0, // Start at 0 | |
127 | 512, // Quantization unit is 512 | |
128 | 17, // Writes up to 8M >= MAX_BYTES_PER_SYNC_POINT | |
129 | }; | |
130 | ||
131 | // Op size axis configuration for op histogram y axis, values are in bytes | |
132 | PerfHistogramCommon::axis_config_d op_hist_y_axis_config{ | |
133 | "Request size (bytes)", | |
134 | PerfHistogramCommon::SCALE_LOG2, ///< Request size in logarithmic scale | |
135 | 0, ///< Start at 0 | |
136 | 512, ///< Quantization unit is 512 bytes | |
137 | 16, ///< Writes up to >32k | |
138 | }; | |
139 | ||
140 | // Num items configuration for op histogram y axis, values are in items | |
141 | PerfHistogramCommon::axis_config_d op_hist_y_axis_count_config{ | |
142 | "Number of items", | |
143 | PerfHistogramCommon::SCALE_LINEAR, ///< Request size in linear scale | |
144 | 0, ///< Start at 0 | |
145 | 1, ///< Quantization unit is 1 | |
146 | 32, ///< Writes up to >32k | |
147 | }; | |
148 | ||
149 | plb.add_u64_counter(l_librbd_pwl_rd_req, "rd", "Reads"); | |
150 | plb.add_u64_counter(l_librbd_pwl_rd_bytes, "rd_bytes", "Data size in reads"); | |
151 | plb.add_time_avg(l_librbd_pwl_rd_latency, "rd_latency", "Latency of reads"); | |
152 | ||
153 | plb.add_u64_counter(l_librbd_pwl_rd_hit_req, "hit_rd", "Reads completely hitting RWL"); | |
154 | plb.add_u64_counter(l_librbd_pwl_rd_hit_bytes, "rd_hit_bytes", "Bytes read from RWL"); | |
155 | plb.add_time_avg(l_librbd_pwl_rd_hit_latency, "hit_rd_latency", "Latency of read hits"); | |
156 | ||
157 | plb.add_u64_counter(l_librbd_pwl_rd_part_hit_req, "part_hit_rd", "reads partially hitting RWL"); | |
158 | ||
159 | plb.add_u64_counter_histogram( | |
160 | l_librbd_pwl_syncpoint_hist, "syncpoint_logentry_bytes_histogram", | |
161 | sp_logentry_number_config, sp_bytes_number_config, | |
162 | "Histogram of syncpoint's logentry numbers vs bytes number"); | |
163 | ||
164 | plb.add_u64_counter(l_librbd_pwl_wr_req, "wr", "Writes"); | |
a4b75251 | 165 | plb.add_u64_counter(l_librbd_pwl_wr_bytes, "wr_bytes", "Data size in writes"); |
f67539c2 TL |
166 | plb.add_u64_counter(l_librbd_pwl_wr_req_def, "wr_def", "Writes deferred for resources"); |
167 | plb.add_u64_counter(l_librbd_pwl_wr_req_def_lanes, "wr_def_lanes", "Writes deferred for lanes"); | |
168 | plb.add_u64_counter(l_librbd_pwl_wr_req_def_log, "wr_def_log", "Writes deferred for log entries"); | |
169 | plb.add_u64_counter(l_librbd_pwl_wr_req_def_buf, "wr_def_buf", "Writes deferred for buffers"); | |
170 | plb.add_u64_counter(l_librbd_pwl_wr_req_overlap, "wr_overlap", "Writes overlapping with prior in-progress writes"); | |
171 | plb.add_u64_counter(l_librbd_pwl_wr_req_queued, "wr_q_barrier", "Writes queued for prior barriers (aio_flush)"); | |
f67539c2 TL |
172 | |
173 | plb.add_u64_counter(l_librbd_pwl_log_ops, "log_ops", "Log appends"); | |
174 | plb.add_u64_avg(l_librbd_pwl_log_op_bytes, "log_op_bytes", "Average log append bytes"); | |
175 | ||
176 | plb.add_time_avg( | |
177 | l_librbd_pwl_req_arr_to_all_t, "req_arr_to_all_t", | |
178 | "Average arrival to allocation time (time deferred for overlap)"); | |
179 | plb.add_time_avg( | |
180 | l_librbd_pwl_req_arr_to_dis_t, "req_arr_to_dis_t", | |
181 | "Average arrival to dispatch time (includes time deferred for overlaps and allocation)"); | |
182 | plb.add_time_avg( | |
183 | l_librbd_pwl_req_all_to_dis_t, "req_all_to_dis_t", | |
184 | "Average allocation to dispatch time (time deferred for log resources)"); | |
185 | plb.add_time_avg( | |
186 | l_librbd_pwl_wr_latency, "wr_latency", | |
187 | "Latency of writes (persistent completion)"); | |
188 | plb.add_u64_counter_histogram( | |
189 | l_librbd_pwl_wr_latency_hist, "wr_latency_bytes_histogram", | |
190 | op_hist_x_axis_config, op_hist_y_axis_config, | |
191 | "Histogram of write request latency (nanoseconds) vs. bytes written"); | |
192 | plb.add_time_avg( | |
193 | l_librbd_pwl_wr_caller_latency, "caller_wr_latency", | |
194 | "Latency of write completion to caller"); | |
195 | plb.add_time_avg( | |
196 | l_librbd_pwl_nowait_req_arr_to_all_t, "req_arr_to_all_nw_t", | |
197 | "Average arrival to allocation time (time deferred for overlap)"); | |
198 | plb.add_time_avg( | |
199 | l_librbd_pwl_nowait_req_arr_to_dis_t, "req_arr_to_dis_nw_t", | |
200 | "Average arrival to dispatch time (includes time deferred for overlaps and allocation)"); | |
201 | plb.add_time_avg( | |
202 | l_librbd_pwl_nowait_req_all_to_dis_t, "req_all_to_dis_nw_t", | |
203 | "Average allocation to dispatch time (time deferred for log resources)"); | |
204 | plb.add_time_avg( | |
205 | l_librbd_pwl_nowait_wr_latency, "wr_latency_nw", | |
206 | "Latency of writes (persistent completion) not deferred for free space"); | |
207 | plb.add_u64_counter_histogram( | |
208 | l_librbd_pwl_nowait_wr_latency_hist, "wr_latency_nw_bytes_histogram", | |
209 | op_hist_x_axis_config, op_hist_y_axis_config, | |
210 | "Histogram of write request latency (nanoseconds) vs. bytes written for writes not deferred for free space"); | |
211 | plb.add_time_avg( | |
212 | l_librbd_pwl_nowait_wr_caller_latency, "caller_wr_latency_nw", | |
213 | "Latency of write completion to callerfor writes not deferred for free space"); | |
214 | plb.add_time_avg(l_librbd_pwl_log_op_alloc_t, "op_alloc_t", "Average buffer pmemobj_reserve() time"); | |
215 | plb.add_u64_counter_histogram( | |
216 | l_librbd_pwl_log_op_alloc_t_hist, "op_alloc_t_bytes_histogram", | |
217 | op_hist_x_axis_config, op_hist_y_axis_config, | |
218 | "Histogram of buffer pmemobj_reserve() time (nanoseconds) vs. bytes written"); | |
219 | plb.add_time_avg(l_librbd_pwl_log_op_dis_to_buf_t, "op_dis_to_buf_t", "Average dispatch to buffer persist time"); | |
220 | plb.add_time_avg(l_librbd_pwl_log_op_dis_to_app_t, "op_dis_to_app_t", "Average dispatch to log append time"); | |
221 | plb.add_time_avg(l_librbd_pwl_log_op_dis_to_cmp_t, "op_dis_to_cmp_t", "Average dispatch to persist completion time"); | |
222 | plb.add_u64_counter_histogram( | |
223 | l_librbd_pwl_log_op_dis_to_cmp_t_hist, "op_dis_to_cmp_t_bytes_histogram", | |
224 | op_hist_x_axis_config, op_hist_y_axis_config, | |
225 | "Histogram of op dispatch to persist complete time (nanoseconds) vs. bytes written"); | |
226 | ||
227 | plb.add_time_avg( | |
228 | l_librbd_pwl_log_op_buf_to_app_t, "op_buf_to_app_t", | |
229 | "Average buffer persist to log append time (write data persist/replicate + wait for append time)"); | |
230 | plb.add_time_avg( | |
231 | l_librbd_pwl_log_op_buf_to_bufc_t, "op_buf_to_bufc_t", | |
232 | "Average buffer persist time (write data persist/replicate time)"); | |
233 | plb.add_u64_counter_histogram( | |
234 | l_librbd_pwl_log_op_buf_to_bufc_t_hist, "op_buf_to_bufc_t_bytes_histogram", | |
235 | op_hist_x_axis_config, op_hist_y_axis_config, | |
236 | "Histogram of write buffer persist time (nanoseconds) vs. bytes written"); | |
237 | plb.add_time_avg( | |
238 | l_librbd_pwl_log_op_app_to_cmp_t, "op_app_to_cmp_t", | |
239 | "Average log append to persist complete time (log entry append/replicate + wait for complete time)"); | |
240 | plb.add_time_avg( | |
241 | l_librbd_pwl_log_op_app_to_appc_t, "op_app_to_appc_t", | |
242 | "Average log append to persist complete time (log entry append/replicate time)"); | |
243 | plb.add_u64_counter_histogram( | |
244 | l_librbd_pwl_log_op_app_to_appc_t_hist, "op_app_to_appc_t_bytes_histogram", | |
245 | op_hist_x_axis_config, op_hist_y_axis_config, | |
246 | "Histogram of log append persist time (nanoseconds) (vs. op bytes)"); | |
247 | ||
248 | plb.add_u64_counter(l_librbd_pwl_discard, "discard", "Discards"); | |
249 | plb.add_u64_counter(l_librbd_pwl_discard_bytes, "discard_bytes", "Bytes discarded"); | |
250 | plb.add_time_avg(l_librbd_pwl_discard_latency, "discard_lat", "Discard latency"); | |
251 | ||
252 | plb.add_u64_counter(l_librbd_pwl_aio_flush, "aio_flush", "AIO flush (flush to RWL)"); | |
253 | plb.add_u64_counter(l_librbd_pwl_aio_flush_def, "aio_flush_def", "AIO flushes deferred for resources"); | |
254 | plb.add_time_avg(l_librbd_pwl_aio_flush_latency, "aio_flush_lat", "AIO flush latency"); | |
255 | ||
256 | plb.add_u64_counter(l_librbd_pwl_ws,"ws", "Write Sames"); | |
257 | plb.add_u64_counter(l_librbd_pwl_ws_bytes, "ws_bytes", "Write Same bytes to image"); | |
258 | plb.add_time_avg(l_librbd_pwl_ws_latency, "ws_lat", "Write Same latency"); | |
259 | ||
260 | plb.add_u64_counter(l_librbd_pwl_cmp, "cmp", "Compare and Write requests"); | |
261 | plb.add_u64_counter(l_librbd_pwl_cmp_bytes, "cmp_bytes", "Compare and Write bytes compared/written"); | |
262 | plb.add_time_avg(l_librbd_pwl_cmp_latency, "cmp_lat", "Compare and Write latecy"); | |
263 | plb.add_u64_counter(l_librbd_pwl_cmp_fails, "cmp_fails", "Compare and Write compare fails"); | |
264 | ||
a4b75251 TL |
265 | plb.add_u64_counter(l_librbd_pwl_internal_flush, "internal_flush", "Flush RWL (write back to OSD)"); |
266 | plb.add_time_avg(l_librbd_pwl_writeback_latency, "writeback_lat", "write back to OSD latency"); | |
f67539c2 TL |
267 | plb.add_u64_counter(l_librbd_pwl_invalidate_cache, "invalidate", "Invalidate RWL"); |
268 | plb.add_u64_counter(l_librbd_pwl_invalidate_discard_cache, "discard", "Discard and invalidate RWL"); | |
269 | ||
270 | plb.add_time_avg(l_librbd_pwl_append_tx_t, "append_tx_lat", "Log append transaction latency"); | |
271 | plb.add_u64_counter_histogram( | |
272 | l_librbd_pwl_append_tx_t_hist, "append_tx_lat_histogram", | |
273 | op_hist_x_axis_config, op_hist_y_axis_count_config, | |
274 | "Histogram of log append transaction time (nanoseconds) vs. entries appended"); | |
275 | plb.add_time_avg(l_librbd_pwl_retire_tx_t, "retire_tx_lat", "Log retire transaction latency"); | |
276 | plb.add_u64_counter_histogram( | |
277 | l_librbd_pwl_retire_tx_t_hist, "retire_tx_lat_histogram", | |
278 | op_hist_x_axis_config, op_hist_y_axis_count_config, | |
279 | "Histogram of log retire transaction time (nanoseconds) vs. entries retired"); | |
280 | ||
281 | m_perfcounter = plb.create_perf_counters(); | |
282 | m_image_ctx.cct->get_perfcounters_collection()->add(m_perfcounter); | |
283 | } | |
284 | ||
285 | template <typename I> | |
286 | void AbstractWriteLog<I>::perf_stop() { | |
287 | ceph_assert(m_perfcounter); | |
288 | m_image_ctx.cct->get_perfcounters_collection()->remove(m_perfcounter); | |
289 | delete m_perfcounter; | |
290 | } | |
291 | ||
292 | template <typename I> | |
293 | void AbstractWriteLog<I>::log_perf() { | |
294 | bufferlist bl; | |
295 | Formatter *f = Formatter::create("json-pretty"); | |
296 | bl.append("Perf dump follows\n--- Begin perf dump ---\n"); | |
297 | bl.append("{\n"); | |
298 | stringstream ss; | |
299 | utime_t now = ceph_clock_now(); | |
300 | ss << "\"test_time\": \"" << now << "\","; | |
301 | ss << "\"image\": \"" << m_image_ctx.name << "\","; | |
302 | bl.append(ss); | |
303 | bl.append("\"stats\": "); | |
304 | m_image_ctx.cct->get_perfcounters_collection()->dump_formatted(f, 0); | |
305 | f->flush(bl); | |
306 | bl.append(",\n\"histograms\": "); | |
307 | m_image_ctx.cct->get_perfcounters_collection()->dump_formatted_histograms(f, 0); | |
308 | f->flush(bl); | |
309 | delete f; | |
310 | bl.append("}\n--- End perf dump ---\n"); | |
311 | bl.append('\0'); | |
312 | ldout(m_image_ctx.cct, 1) << bl.c_str() << dendl; | |
313 | } | |
314 | ||
315 | template <typename I> | |
316 | void AbstractWriteLog<I>::periodic_stats() { | |
317 | std::lock_guard locker(m_lock); | |
33c7a0ef TL |
318 | update_image_cache_state(); |
319 | ldout(m_image_ctx.cct, 5) << "STATS: m_log_entries=" << m_log_entries.size() | |
a4b75251 TL |
320 | << ", m_dirty_log_entries=" << m_dirty_log_entries.size() |
321 | << ", m_free_log_entries=" << m_free_log_entries | |
322 | << ", m_bytes_allocated=" << m_bytes_allocated | |
323 | << ", m_bytes_cached=" << m_bytes_cached | |
324 | << ", m_bytes_dirty=" << m_bytes_dirty | |
325 | << ", bytes available=" << m_bytes_allocated_cap - m_bytes_allocated | |
326 | << ", m_first_valid_entry=" << m_first_valid_entry | |
327 | << ", m_first_free_entry=" << m_first_free_entry | |
328 | << ", m_current_sync_gen=" << m_current_sync_gen | |
329 | << ", m_flushed_sync_gen=" << m_flushed_sync_gen | |
f67539c2 TL |
330 | << dendl; |
331 | } | |
332 | ||
333 | template <typename I> | |
334 | void AbstractWriteLog<I>::arm_periodic_stats() { | |
335 | ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); | |
33c7a0ef TL |
336 | m_timer_ctx = new LambdaContext([this](int r) { |
337 | /* m_timer_lock is held */ | |
338 | periodic_stats(); | |
339 | arm_periodic_stats(); | |
340 | }); | |
341 | m_timer->add_event_after(LOG_STATS_INTERVAL_SECONDS, m_timer_ctx); | |
f67539c2 TL |
342 | } |
343 | ||
344 | template <typename I> | |
345 | void AbstractWriteLog<I>::update_entries(std::shared_ptr<GenericLogEntry> *log_entry, | |
346 | WriteLogCacheEntry *cache_entry, std::map<uint64_t, bool> &missing_sync_points, | |
347 | std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries, | |
a4b75251 | 348 | uint64_t entry_index) { |
f67539c2 TL |
349 | bool writer = cache_entry->is_writer(); |
350 | if (cache_entry->is_sync_point()) { | |
351 | ldout(m_image_ctx.cct, 20) << "Entry " << entry_index | |
352 | << " is a sync point. cache_entry=[" << *cache_entry << "]" << dendl; | |
353 | auto sync_point_entry = std::make_shared<SyncPointLogEntry>(cache_entry->sync_gen_number); | |
354 | *log_entry = sync_point_entry; | |
355 | sync_point_entries[cache_entry->sync_gen_number] = sync_point_entry; | |
356 | missing_sync_points.erase(cache_entry->sync_gen_number); | |
357 | m_current_sync_gen = cache_entry->sync_gen_number; | |
358 | } else if (cache_entry->is_write()) { | |
359 | ldout(m_image_ctx.cct, 20) << "Entry " << entry_index | |
360 | << " is a write. cache_entry=[" << *cache_entry << "]" << dendl; | |
361 | auto write_entry = | |
362 | m_builder->create_write_log_entry(nullptr, cache_entry->image_offset_bytes, cache_entry->write_bytes); | |
363 | write_data_to_buffer(write_entry, cache_entry); | |
364 | *log_entry = write_entry; | |
365 | } else if (cache_entry->is_writesame()) { | |
366 | ldout(m_image_ctx.cct, 20) << "Entry " << entry_index | |
367 | << " is a write same. cache_entry=[" << *cache_entry << "]" << dendl; | |
368 | auto ws_entry = | |
369 | m_builder->create_writesame_log_entry(nullptr, cache_entry->image_offset_bytes, | |
370 | cache_entry->write_bytes, cache_entry->ws_datalen); | |
371 | write_data_to_buffer(ws_entry, cache_entry); | |
372 | *log_entry = ws_entry; | |
373 | } else if (cache_entry->is_discard()) { | |
374 | ldout(m_image_ctx.cct, 20) << "Entry " << entry_index | |
375 | << " is a discard. cache_entry=[" << *cache_entry << "]" << dendl; | |
376 | auto discard_entry = | |
377 | std::make_shared<DiscardLogEntry>(nullptr, cache_entry->image_offset_bytes, cache_entry->write_bytes, | |
378 | m_discard_granularity_bytes); | |
379 | *log_entry = discard_entry; | |
380 | } else { | |
381 | lderr(m_image_ctx.cct) << "Unexpected entry type in entry " << entry_index | |
382 | << ", cache_entry=[" << *cache_entry << "]" << dendl; | |
383 | } | |
384 | ||
385 | if (writer) { | |
386 | ldout(m_image_ctx.cct, 20) << "Entry " << entry_index | |
387 | << " writes. cache_entry=[" << *cache_entry << "]" << dendl; | |
388 | if (!sync_point_entries[cache_entry->sync_gen_number]) { | |
389 | missing_sync_points[cache_entry->sync_gen_number] = true; | |
390 | } | |
391 | } | |
392 | } | |
393 | ||
394 | template <typename I> | |
395 | void AbstractWriteLog<I>::update_sync_points(std::map<uint64_t, bool> &missing_sync_points, | |
396 | std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries, | |
a4b75251 | 397 | DeferredContexts &later) { |
f67539c2 TL |
398 | /* Create missing sync points. These must not be appended until the |
399 | * entry reload is complete and the write map is up to | |
400 | * date. Currently this is handled by the deferred contexts object | |
401 | * passed to new_sync_point(). These contexts won't be completed | |
402 | * until this function returns. */ | |
403 | for (auto &kv : missing_sync_points) { | |
404 | ldout(m_image_ctx.cct, 5) << "Adding sync point " << kv.first << dendl; | |
405 | if (0 == m_current_sync_gen) { | |
406 | /* The unlikely case where the log contains writing entries, but no sync | |
407 | * points (e.g. because they were all retired) */ | |
408 | m_current_sync_gen = kv.first-1; | |
409 | } | |
410 | ceph_assert(kv.first == m_current_sync_gen+1); | |
411 | init_flush_new_sync_point(later); | |
412 | ceph_assert(kv.first == m_current_sync_gen); | |
20effc67 | 413 | sync_point_entries[kv.first] = m_current_sync_point->log_entry; |
f67539c2 TL |
414 | } |
415 | ||
416 | /* | |
417 | * Iterate over the log entries again (this time via the global | |
418 | * entries list), connecting write entries to their sync points and | |
419 | * updating the sync point stats. | |
420 | * | |
421 | * Add writes to the write log map. | |
422 | */ | |
423 | std::shared_ptr<SyncPointLogEntry> previous_sync_point_entry = nullptr; | |
424 | for (auto &log_entry : m_log_entries) { | |
425 | if ((log_entry->write_bytes() > 0) || (log_entry->bytes_dirty() > 0)) { | |
426 | /* This entry is one of the types that write */ | |
427 | auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(log_entry); | |
428 | if (gen_write_entry) { | |
429 | auto sync_point_entry = sync_point_entries[gen_write_entry->ram_entry.sync_gen_number]; | |
430 | if (!sync_point_entry) { | |
431 | lderr(m_image_ctx.cct) << "Sync point missing for entry=[" << *gen_write_entry << "]" << dendl; | |
432 | ceph_assert(false); | |
433 | } else { | |
434 | gen_write_entry->sync_point_entry = sync_point_entry; | |
435 | sync_point_entry->writes++; | |
436 | sync_point_entry->bytes += gen_write_entry->ram_entry.write_bytes; | |
437 | sync_point_entry->writes_completed++; | |
438 | m_blocks_to_log_entries.add_log_entry(gen_write_entry); | |
439 | /* This entry is only dirty if its sync gen number is > the flushed | |
440 | * sync gen number from the root object. */ | |
441 | if (gen_write_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) { | |
442 | m_dirty_log_entries.push_back(log_entry); | |
443 | m_bytes_dirty += gen_write_entry->bytes_dirty(); | |
444 | } else { | |
445 | gen_write_entry->set_flushed(true); | |
446 | sync_point_entry->writes_flushed++; | |
447 | } | |
a4b75251 TL |
448 | |
449 | /* calc m_bytes_allocated & m_bytes_cached */ | |
450 | inc_allocated_cached_bytes(log_entry); | |
f67539c2 TL |
451 | } |
452 | } | |
453 | } else { | |
454 | /* This entry is sync point entry */ | |
455 | auto sync_point_entry = static_pointer_cast<SyncPointLogEntry>(log_entry); | |
456 | if (sync_point_entry) { | |
457 | if (previous_sync_point_entry) { | |
458 | previous_sync_point_entry->next_sync_point_entry = sync_point_entry; | |
459 | if (previous_sync_point_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) { | |
460 | sync_point_entry->prior_sync_point_flushed = false; | |
461 | ceph_assert(!previous_sync_point_entry->prior_sync_point_flushed || | |
462 | (0 == previous_sync_point_entry->writes) || | |
463 | (previous_sync_point_entry->writes >= previous_sync_point_entry->writes_flushed)); | |
464 | } else { | |
465 | sync_point_entry->prior_sync_point_flushed = true; | |
466 | ceph_assert(previous_sync_point_entry->prior_sync_point_flushed); | |
467 | ceph_assert(previous_sync_point_entry->writes == previous_sync_point_entry->writes_flushed); | |
468 | } | |
469 | } else { | |
470 | /* There are no previous sync points, so we'll consider them flushed */ | |
471 | sync_point_entry->prior_sync_point_flushed = true; | |
472 | } | |
473 | previous_sync_point_entry = sync_point_entry; | |
474 | ldout(m_image_ctx.cct, 10) << "Loaded to sync point=[" << *sync_point_entry << dendl; | |
475 | } | |
476 | } | |
477 | } | |
478 | if (0 == m_current_sync_gen) { | |
479 | /* If a re-opened log was completely flushed, we'll have found no sync point entries here, | |
480 | * and not advanced m_current_sync_gen. Here we ensure it starts past the last flushed sync | |
481 | * point recorded in the log. */ | |
482 | m_current_sync_gen = m_flushed_sync_gen; | |
483 | } | |
484 | } | |
485 | ||
486 | template <typename I> | |
487 | void AbstractWriteLog<I>::pwl_init(Context *on_finish, DeferredContexts &later) { | |
488 | CephContext *cct = m_image_ctx.cct; | |
489 | ldout(cct, 20) << dendl; | |
490 | ceph_assert(m_cache_state); | |
491 | std::lock_guard locker(m_lock); | |
492 | ceph_assert(!m_initialized); | |
493 | ldout(cct,5) << "image name: " << m_image_ctx.name << " id: " << m_image_ctx.id << dendl; | |
494 | ||
495 | if (!m_cache_state->present) { | |
496 | m_cache_state->host = ceph_get_short_hostname(); | |
497 | m_cache_state->size = m_image_ctx.config.template get_val<uint64_t>( | |
498 | "rbd_persistent_cache_size"); | |
499 | ||
500 | string path = m_image_ctx.config.template get_val<string>( | |
501 | "rbd_persistent_cache_path"); | |
502 | std::string pool_name = m_image_ctx.md_ctx.get_pool_name(); | |
503 | m_cache_state->path = path + "/rbd-pwl." + pool_name + "." + m_image_ctx.id + ".pool"; | |
504 | } | |
505 | ||
506 | ldout(cct,5) << "pwl_size: " << m_cache_state->size << dendl; | |
507 | ldout(cct,5) << "pwl_path: " << m_cache_state->path << dendl; | |
508 | ||
509 | m_log_pool_name = m_cache_state->path; | |
a4b75251 TL |
510 | m_log_pool_size = max(m_cache_state->size, MIN_POOL_SIZE); |
511 | m_log_pool_size = p2align(m_log_pool_size, POOL_SIZE_ALIGN); | |
512 | ldout(cct, 5) << "pool " << m_log_pool_name << " size " << m_log_pool_size | |
513 | << " (adjusted from " << m_cache_state->size << ")" << dendl; | |
f67539c2 TL |
514 | |
515 | if ((!m_cache_state->present) && | |
516 | (access(m_log_pool_name.c_str(), F_OK) == 0)) { | |
517 | ldout(cct, 5) << "There's an existing pool file " << m_log_pool_name | |
518 | << ", While there's no cache in the image metatata." << dendl; | |
519 | if (remove(m_log_pool_name.c_str()) != 0) { | |
20effc67 | 520 | lderr(cct) << "failed to remove the pool file " << m_log_pool_name |
f67539c2 TL |
521 | << dendl; |
522 | on_finish->complete(-errno); | |
523 | return; | |
524 | } else { | |
525 | ldout(cct, 5) << "Removed the existing pool file." << dendl; | |
526 | } | |
527 | } else if ((m_cache_state->present) && | |
528 | (access(m_log_pool_name.c_str(), F_OK) != 0)) { | |
20effc67 TL |
529 | lderr(cct) << "can't find the existed pool file: " << m_log_pool_name |
530 | << ". error: " << cpp_strerror(-errno) << dendl; | |
f67539c2 | 531 | on_finish->complete(-errno); |
a4b75251 | 532 | return; |
f67539c2 TL |
533 | } |
534 | ||
a4b75251 TL |
535 | bool succeeded = initialize_pool(on_finish, later); |
536 | if (!succeeded) { | |
537 | return ; | |
538 | } | |
f67539c2 TL |
539 | |
540 | ldout(cct,1) << "pool " << m_log_pool_name << " has " << m_total_log_entries | |
541 | << " log entries, " << m_free_log_entries << " of which are free." | |
542 | << " first_valid=" << m_first_valid_entry | |
543 | << ", first_free=" << m_first_free_entry | |
544 | << ", flushed_sync_gen=" << m_flushed_sync_gen | |
545 | << ", m_current_sync_gen=" << m_current_sync_gen << dendl; | |
546 | if (m_first_free_entry == m_first_valid_entry) { | |
547 | ldout(cct,1) << "write log is empty" << dendl; | |
548 | m_cache_state->empty = true; | |
549 | } | |
550 | ||
551 | /* Start the sync point following the last one seen in the | |
552 | * log. Flush the last sync point created during the loading of the | |
553 | * existing log entries. */ | |
554 | init_flush_new_sync_point(later); | |
555 | ldout(cct,20) << "new sync point = [" << m_current_sync_point << "]" << dendl; | |
556 | ||
557 | m_initialized = true; | |
558 | // Start the thread | |
559 | m_thread_pool.start(); | |
560 | ||
f67539c2 TL |
561 | /* Do these after we drop lock */ |
562 | later.add(new LambdaContext([this](int r) { | |
f67539c2 TL |
563 | /* Log stats for the first time */ |
564 | periodic_stats(); | |
565 | /* Arm periodic stats logging for the first time */ | |
566 | std::lock_guard timer_locker(*m_timer_lock); | |
567 | arm_periodic_stats(); | |
33c7a0ef | 568 | })); |
f67539c2 TL |
569 | m_image_ctx.op_work_queue->queue(on_finish, 0); |
570 | } | |
571 | ||
33c7a0ef TL |
572 | template <typename I> |
573 | void AbstractWriteLog<I>::update_image_cache_state() { | |
574 | using klass = AbstractWriteLog<I>; | |
575 | Context *ctx = util::create_context_callback< | |
576 | klass, &klass::handle_update_image_cache_state>(this); | |
577 | update_image_cache_state(ctx); | |
578 | } | |
579 | ||
f67539c2 TL |
580 | template <typename I> |
581 | void AbstractWriteLog<I>::update_image_cache_state(Context *on_finish) { | |
33c7a0ef TL |
582 | ldout(m_image_ctx.cct, 10) << dendl; |
583 | ||
584 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
585 | m_cache_state->allocated_bytes = m_bytes_allocated; | |
586 | m_cache_state->cached_bytes = m_bytes_cached; | |
587 | m_cache_state->dirty_bytes = m_bytes_dirty; | |
588 | m_cache_state->free_bytes = m_bytes_allocated_cap - m_bytes_allocated; | |
589 | m_cache_state->hits_full = m_perfcounter->get(l_librbd_pwl_rd_hit_req); | |
590 | m_cache_state->hits_partial = m_perfcounter->get(l_librbd_pwl_rd_part_hit_req); | |
591 | m_cache_state->misses = m_perfcounter->get(l_librbd_pwl_rd_req) - | |
592 | m_cache_state->hits_full - m_cache_state->hits_partial; | |
593 | m_cache_state->hit_bytes = m_perfcounter->get(l_librbd_pwl_rd_hit_bytes); | |
594 | m_cache_state->miss_bytes = m_perfcounter->get(l_librbd_pwl_rd_bytes) - | |
595 | m_cache_state->hit_bytes; | |
f67539c2 TL |
596 | m_cache_state->write_image_cache_state(on_finish); |
597 | } | |
598 | ||
33c7a0ef TL |
599 | template <typename I> |
600 | void AbstractWriteLog<I>::handle_update_image_cache_state(int r) { | |
601 | CephContext *cct = m_image_ctx.cct; | |
602 | ldout(cct, 10) << "r=" << r << dendl; | |
603 | ||
604 | if (r < 0) { | |
605 | lderr(cct) << "failed to update image cache state: " << cpp_strerror(r) | |
606 | << dendl; | |
607 | return; | |
608 | } | |
609 | } | |
610 | ||
f67539c2 TL |
611 | template <typename I> |
612 | void AbstractWriteLog<I>::init(Context *on_finish) { | |
613 | CephContext *cct = m_image_ctx.cct; | |
614 | ldout(cct, 20) << dendl; | |
a4b75251 TL |
615 | auto pname = std::string("librbd-pwl-") + m_image_ctx.id + |
616 | std::string("-") + m_image_ctx.md_ctx.get_pool_name() + | |
617 | std::string("-") + m_image_ctx.name; | |
618 | perf_start(pname); | |
f67539c2 TL |
619 | |
620 | ceph_assert(!m_initialized); | |
621 | ||
622 | Context *ctx = new LambdaContext( | |
623 | [this, on_finish](int r) { | |
624 | if (r >= 0) { | |
33c7a0ef | 625 | std::lock_guard locker(m_lock); |
f67539c2 TL |
626 | update_image_cache_state(on_finish); |
627 | } else { | |
628 | on_finish->complete(r); | |
629 | } | |
630 | }); | |
631 | ||
632 | DeferredContexts later; | |
633 | pwl_init(ctx, later); | |
634 | } | |
635 | ||
636 | template <typename I> | |
637 | void AbstractWriteLog<I>::shut_down(Context *on_finish) { | |
638 | CephContext *cct = m_image_ctx.cct; | |
639 | ldout(cct, 20) << dendl; | |
640 | ||
641 | ldout(cct,5) << "image name: " << m_image_ctx.name << " id: " << m_image_ctx.id << dendl; | |
642 | ||
643 | Context *ctx = new LambdaContext( | |
644 | [this, on_finish](int r) { | |
33c7a0ef TL |
645 | if (m_perfcounter) { |
646 | perf_stop(); | |
647 | } | |
f67539c2 TL |
648 | ldout(m_image_ctx.cct, 6) << "shutdown complete" << dendl; |
649 | m_image_ctx.op_work_queue->queue(on_finish, r); | |
650 | }); | |
651 | ctx = new LambdaContext( | |
652 | [this, ctx](int r) { | |
653 | ldout(m_image_ctx.cct, 6) << "image cache cleaned" << dendl; | |
654 | Context *next_ctx = override_ctx(r, ctx); | |
33c7a0ef | 655 | periodic_stats(); |
f67539c2 | 656 | |
33c7a0ef TL |
657 | std::lock_guard locker(m_lock); |
658 | check_image_cache_state_clean(); | |
659 | m_wake_up_enabled = false; | |
660 | m_log_entries.clear(); | |
661 | m_cache_state->clean = true; | |
662 | m_cache_state->empty = true; | |
663 | remove_pool_file(); | |
f67539c2 TL |
664 | update_image_cache_state(next_ctx); |
665 | }); | |
a4b75251 TL |
666 | ctx = new LambdaContext( |
667 | [this, ctx](int r) { | |
668 | Context *next_ctx = override_ctx(r, ctx); | |
669 | ldout(m_image_ctx.cct, 6) << "waiting for in flight operations" << dendl; | |
670 | // Wait for in progress IOs to complete | |
671 | next_ctx = util::create_async_context_callback(&m_work_queue, next_ctx); | |
672 | m_async_op_tracker.wait_for_ops(next_ctx); | |
673 | }); | |
f67539c2 TL |
674 | ctx = new LambdaContext( |
675 | [this, ctx](int r) { | |
676 | Context *next_ctx = override_ctx(r, ctx); | |
677 | { | |
678 | /* Sync with process_writeback_dirty_entries() */ | |
679 | RWLock::WLocker entry_reader_wlocker(m_entry_reader_lock); | |
680 | m_shutting_down = true; | |
681 | /* Flush all writes to OSDs (unless disabled) and wait for all | |
682 | in-progress flush writes to complete */ | |
683 | ldout(m_image_ctx.cct, 6) << "flushing" << dendl; | |
33c7a0ef | 684 | periodic_stats(); |
f67539c2 TL |
685 | } |
686 | flush_dirty_entries(next_ctx); | |
687 | }); | |
f67539c2 TL |
688 | ctx = new LambdaContext( |
689 | [this, ctx](int r) { | |
690 | ldout(m_image_ctx.cct, 6) << "Done internal_flush in shutdown" << dendl; | |
691 | m_work_queue.queue(ctx, r); | |
692 | }); | |
693 | /* Complete all in-flight writes before shutting down */ | |
694 | ldout(m_image_ctx.cct, 6) << "internal_flush in shutdown" << dendl; | |
695 | internal_flush(false, ctx); | |
696 | } | |
697 | ||
698 | template <typename I> | |
699 | void AbstractWriteLog<I>::read(Extents&& image_extents, | |
700 | ceph::bufferlist* bl, | |
701 | int fadvise_flags, Context *on_finish) { | |
702 | CephContext *cct = m_image_ctx.cct; | |
703 | utime_t now = ceph_clock_now(); | |
704 | ||
705 | on_finish = new LambdaContext( | |
706 | [this, on_finish](int r) { | |
707 | m_async_op_tracker.finish_op(); | |
708 | on_finish->complete(r); | |
709 | }); | |
710 | C_ReadRequest *read_ctx = m_builder->create_read_request( | |
711 | cct, now, m_perfcounter, bl, on_finish); | |
712 | ldout(cct, 20) << "name: " << m_image_ctx.name << " id: " << m_image_ctx.id | |
20effc67 TL |
713 | << "image_extents=" << image_extents |
714 | << ", bl=" << bl | |
715 | << ", on_finish=" << on_finish << dendl; | |
f67539c2 TL |
716 | |
717 | ceph_assert(m_initialized); | |
718 | bl->clear(); | |
719 | m_perfcounter->inc(l_librbd_pwl_rd_req, 1); | |
720 | ||
a4b75251 | 721 | std::vector<std::shared_ptr<GenericWriteLogEntry>> log_entries_to_read; |
f67539c2 TL |
722 | std::vector<bufferlist*> bls_to_read; |
723 | ||
724 | m_async_op_tracker.start_op(); | |
725 | Context *ctx = new LambdaContext( | |
726 | [this, read_ctx, fadvise_flags](int r) { | |
727 | if (read_ctx->miss_extents.empty()) { | |
728 | /* All of this read comes from RWL */ | |
729 | read_ctx->complete(0); | |
730 | } else { | |
731 | /* Pass the read misses on to the layer below RWL */ | |
732 | m_image_writeback.aio_read( | |
733 | std::move(read_ctx->miss_extents), &read_ctx->miss_bl, | |
734 | fadvise_flags, read_ctx); | |
735 | } | |
736 | }); | |
737 | ||
738 | /* | |
739 | * The strategy here is to look up all the WriteLogMapEntries that overlap | |
740 | * this read, and iterate through those to separate this read into hits and | |
741 | * misses. A new Extents object is produced here with Extents for each miss | |
742 | * region. The miss Extents is then passed on to the read cache below RWL. We | |
743 | * also produce an ImageExtentBufs for all the extents (hit or miss) in this | |
744 | * read. When the read from the lower cache layer completes, we iterate | |
745 | * through the ImageExtentBufs and insert buffers for each cache hit at the | |
746 | * appropriate spot in the bufferlist returned from below for the miss | |
747 | * read. The buffers we insert here refer directly to regions of various | |
748 | * write log entry data buffers. | |
749 | * | |
750 | * Locking: These buffer objects hold a reference on the write log entries | |
751 | * they refer to. Log entries can't be retired until there are no references. | |
752 | * The GenericWriteLogEntry references are released by the buffer destructor. | |
753 | */ | |
754 | for (auto &extent : image_extents) { | |
755 | uint64_t extent_offset = 0; | |
756 | RWLock::RLocker entry_reader_locker(m_entry_reader_lock); | |
757 | WriteLogMapEntries map_entries = m_blocks_to_log_entries.find_map_entries( | |
758 | block_extent(extent)); | |
759 | for (auto &map_entry : map_entries) { | |
760 | Extent entry_image_extent(pwl::image_extent(map_entry.block_extent)); | |
761 | /* If this map entry starts after the current image extent offset ... */ | |
762 | if (entry_image_extent.first > extent.first + extent_offset) { | |
763 | /* ... add range before map_entry to miss extents */ | |
764 | uint64_t miss_extent_start = extent.first + extent_offset; | |
765 | uint64_t miss_extent_length = entry_image_extent.first - | |
766 | miss_extent_start; | |
767 | Extent miss_extent(miss_extent_start, miss_extent_length); | |
768 | read_ctx->miss_extents.push_back(miss_extent); | |
769 | /* Add miss range to read extents */ | |
770 | auto miss_extent_buf = std::make_shared<ImageExtentBuf>(miss_extent); | |
771 | read_ctx->read_extents.push_back(miss_extent_buf); | |
772 | extent_offset += miss_extent_length; | |
773 | } | |
774 | ceph_assert(entry_image_extent.first <= extent.first + extent_offset); | |
775 | uint64_t entry_offset = 0; | |
776 | /* If this map entry starts before the current image extent offset ... */ | |
777 | if (entry_image_extent.first < extent.first + extent_offset) { | |
778 | /* ... compute offset into log entry for this read extent */ | |
779 | entry_offset = (extent.first + extent_offset) - entry_image_extent.first; | |
780 | } | |
781 | /* This read hit ends at the end of the extent or the end of the log | |
782 | entry, whichever is less. */ | |
783 | uint64_t entry_hit_length = min(entry_image_extent.second - entry_offset, | |
784 | extent.second - extent_offset); | |
785 | Extent hit_extent(entry_image_extent.first, entry_hit_length); | |
786 | if (0 == map_entry.log_entry->write_bytes() && | |
787 | 0 < map_entry.log_entry->bytes_dirty()) { | |
788 | /* discard log entry */ | |
789 | ldout(cct, 20) << "discard log entry" << dendl; | |
790 | auto discard_entry = map_entry.log_entry; | |
791 | ldout(cct, 20) << "read hit on discard entry: log_entry=" | |
792 | << *discard_entry | |
793 | << dendl; | |
794 | /* Discards read as zero, so we'll construct a bufferlist of zeros */ | |
795 | bufferlist zero_bl; | |
796 | zero_bl.append_zero(entry_hit_length); | |
797 | /* Add hit extent to read extents */ | |
798 | auto hit_extent_buf = std::make_shared<ImageExtentBuf>( | |
799 | hit_extent, zero_bl); | |
800 | read_ctx->read_extents.push_back(hit_extent_buf); | |
801 | } else { | |
802 | ldout(cct, 20) << "write or writesame log entry" << dendl; | |
803 | /* write and writesame log entry */ | |
804 | /* Offset of the map entry into the log entry's buffer */ | |
805 | uint64_t map_entry_buffer_offset = entry_image_extent.first - | |
806 | map_entry.log_entry->ram_entry.image_offset_bytes; | |
807 | /* Offset into the log entry buffer of this read hit */ | |
808 | uint64_t read_buffer_offset = map_entry_buffer_offset + entry_offset; | |
809 | /* Create buffer object referring to pmem pool for this read hit */ | |
810 | collect_read_extents( | |
811 | read_buffer_offset, map_entry, log_entries_to_read, bls_to_read, | |
812 | entry_hit_length, hit_extent, read_ctx); | |
813 | } | |
814 | /* Exclude RWL hit range from buffer and extent */ | |
815 | extent_offset += entry_hit_length; | |
816 | ldout(cct, 20) << map_entry << dendl; | |
817 | } | |
818 | /* If the last map entry didn't consume the entire image extent ... */ | |
819 | if (extent.second > extent_offset) { | |
820 | /* ... add the rest of this extent to miss extents */ | |
821 | uint64_t miss_extent_start = extent.first + extent_offset; | |
822 | uint64_t miss_extent_length = extent.second - extent_offset; | |
823 | Extent miss_extent(miss_extent_start, miss_extent_length); | |
824 | read_ctx->miss_extents.push_back(miss_extent); | |
825 | /* Add miss range to read extents */ | |
826 | auto miss_extent_buf = std::make_shared<ImageExtentBuf>(miss_extent); | |
827 | read_ctx->read_extents.push_back(miss_extent_buf); | |
828 | extent_offset += miss_extent_length; | |
829 | } | |
830 | } | |
831 | ||
20effc67 TL |
832 | ldout(cct, 20) << "miss_extents=" << read_ctx->miss_extents |
833 | << ", miss_bl=" << read_ctx->miss_bl << dendl; | |
f67539c2 TL |
834 | |
835 | complete_read(log_entries_to_read, bls_to_read, ctx); | |
836 | } | |
837 | ||
838 | template <typename I> | |
839 | void AbstractWriteLog<I>::write(Extents &&image_extents, | |
840 | bufferlist&& bl, | |
841 | int fadvise_flags, | |
842 | Context *on_finish) { | |
843 | CephContext *cct = m_image_ctx.cct; | |
844 | ||
845 | ldout(cct, 20) << "aio_write" << dendl; | |
846 | ||
847 | utime_t now = ceph_clock_now(); | |
848 | m_perfcounter->inc(l_librbd_pwl_wr_req, 1); | |
849 | ||
850 | ceph_assert(m_initialized); | |
851 | ||
20effc67 TL |
852 | /* Split image extents larger than 1M. This isn't strictly necessary but |
853 | * makes libpmemobj allocator's job easier and reduces pmemobj_defrag() cost. | |
854 | * We plan to manage pmem space and allocation by ourselves in the future. | |
f67539c2 TL |
855 | */ |
856 | Extents split_image_extents; | |
857 | uint64_t max_extent_size = get_max_extent(); | |
858 | if (max_extent_size != 0) { | |
859 | for (auto extent : image_extents) { | |
860 | if (extent.second > max_extent_size) { | |
861 | uint64_t off = extent.first; | |
862 | uint64_t extent_bytes = extent.second; | |
863 | for (int i = 0; extent_bytes != 0; ++i) { | |
864 | Extent _ext; | |
865 | _ext.first = off + i * max_extent_size; | |
866 | _ext.second = std::min(max_extent_size, extent_bytes); | |
867 | extent_bytes = extent_bytes - _ext.second ; | |
868 | split_image_extents.emplace_back(_ext); | |
869 | } | |
870 | } else { | |
871 | split_image_extents.emplace_back(extent); | |
872 | } | |
873 | } | |
874 | } else { | |
875 | split_image_extents = image_extents; | |
876 | } | |
877 | ||
878 | C_WriteRequestT *write_req = | |
879 | m_builder->create_write_request(*this, now, std::move(split_image_extents), | |
880 | std::move(bl), fadvise_flags, m_lock, | |
881 | m_perfcounter, on_finish); | |
882 | m_perfcounter->inc(l_librbd_pwl_wr_bytes, | |
883 | write_req->image_extents_summary.total_bytes); | |
884 | ||
885 | /* The lambda below will be called when the block guard for all | |
886 | * blocks affected by this write is obtained */ | |
887 | GuardedRequestFunctionContext *guarded_ctx = | |
888 | new GuardedRequestFunctionContext([this, | |
889 | write_req](GuardedRequestFunctionContext &guard_ctx) { | |
890 | write_req->blockguard_acquired(guard_ctx); | |
891 | alloc_and_dispatch_io_req(write_req); | |
892 | }); | |
893 | ||
894 | detain_guarded_request(write_req, guarded_ctx, false); | |
895 | } | |
896 | ||
897 | template <typename I> | |
898 | void AbstractWriteLog<I>::discard(uint64_t offset, uint64_t length, | |
899 | uint32_t discard_granularity_bytes, | |
900 | Context *on_finish) { | |
901 | CephContext *cct = m_image_ctx.cct; | |
902 | ||
903 | ldout(cct, 20) << dendl; | |
904 | ||
905 | utime_t now = ceph_clock_now(); | |
906 | m_perfcounter->inc(l_librbd_pwl_discard, 1); | |
907 | Extents discard_extents = {{offset, length}}; | |
908 | m_discard_granularity_bytes = discard_granularity_bytes; | |
909 | ||
910 | ceph_assert(m_initialized); | |
911 | ||
912 | auto *discard_req = | |
913 | new C_DiscardRequestT(*this, now, std::move(discard_extents), discard_granularity_bytes, | |
914 | m_lock, m_perfcounter, on_finish); | |
915 | ||
916 | /* The lambda below will be called when the block guard for all | |
917 | * blocks affected by this write is obtained */ | |
918 | GuardedRequestFunctionContext *guarded_ctx = | |
919 | new GuardedRequestFunctionContext([this, discard_req](GuardedRequestFunctionContext &guard_ctx) { | |
920 | discard_req->blockguard_acquired(guard_ctx); | |
921 | alloc_and_dispatch_io_req(discard_req); | |
922 | }); | |
923 | ||
924 | detain_guarded_request(discard_req, guarded_ctx, false); | |
925 | } | |
926 | ||
927 | /** | |
928 | * Aio_flush completes when all previously completed writes are | |
929 | * flushed to persistent cache. We make a best-effort attempt to also | |
930 | * defer until all in-progress writes complete, but we may not know | |
931 | * about all of the writes the application considers in-progress yet, | |
932 | * due to uncertainty in the IO submission workq (multiple WQ threads | |
933 | * may allow out-of-order submission). | |
934 | * | |
935 | * This flush operation will not wait for writes deferred for overlap | |
936 | * in the block guard. | |
937 | */ | |
938 | template <typename I> | |
939 | void AbstractWriteLog<I>::flush(io::FlushSource flush_source, Context *on_finish) { | |
940 | CephContext *cct = m_image_ctx.cct; | |
941 | ldout(cct, 20) << "on_finish=" << on_finish << " flush_source=" << flush_source << dendl; | |
942 | ||
943 | if (io::FLUSH_SOURCE_SHUTDOWN == flush_source || io::FLUSH_SOURCE_INTERNAL == flush_source || | |
944 | io::FLUSH_SOURCE_WRITE_BLOCK == flush_source) { | |
945 | internal_flush(false, on_finish); | |
946 | return; | |
947 | } | |
948 | m_perfcounter->inc(l_librbd_pwl_aio_flush, 1); | |
949 | ||
950 | /* May be called even if initialization fails */ | |
951 | if (!m_initialized) { | |
952 | ldout(cct, 05) << "never initialized" << dendl; | |
953 | /* Deadlock if completed here */ | |
954 | m_image_ctx.op_work_queue->queue(on_finish, 0); | |
955 | return; | |
956 | } | |
957 | ||
958 | { | |
959 | std::shared_lock image_locker(m_image_ctx.image_lock); | |
960 | if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) { | |
961 | on_finish->complete(-EROFS); | |
962 | return; | |
963 | } | |
964 | } | |
965 | ||
966 | auto flush_req = make_flush_req(on_finish); | |
967 | ||
968 | GuardedRequestFunctionContext *guarded_ctx = | |
969 | new GuardedRequestFunctionContext([this, flush_req](GuardedRequestFunctionContext &guard_ctx) { | |
970 | ldout(m_image_ctx.cct, 20) << "flush_req=" << flush_req << " cell=" << guard_ctx.cell << dendl; | |
971 | ceph_assert(guard_ctx.cell); | |
972 | flush_req->detained = guard_ctx.state.detained; | |
973 | /* We don't call flush_req->set_cell(), because the block guard will be released here */ | |
974 | { | |
975 | DeferredContexts post_unlock; /* Do these when the lock below is released */ | |
976 | std::lock_guard locker(m_lock); | |
977 | ||
978 | if (!m_persist_on_flush && m_persist_on_write_until_flush) { | |
979 | m_persist_on_flush = true; | |
980 | ldout(m_image_ctx.cct, 5) << "now persisting on flush" << dendl; | |
981 | } | |
982 | ||
983 | /* | |
984 | * Create a new sync point if there have been writes since the last | |
985 | * one. | |
986 | * | |
987 | * We do not flush the caches below the RWL here. | |
988 | */ | |
989 | flush_new_sync_point_if_needed(flush_req, post_unlock); | |
990 | } | |
991 | ||
992 | release_guarded_request(guard_ctx.cell); | |
993 | }); | |
994 | ||
995 | detain_guarded_request(flush_req, guarded_ctx, true); | |
996 | } | |
997 | ||
998 | template <typename I> | |
999 | void AbstractWriteLog<I>::writesame(uint64_t offset, uint64_t length, | |
1000 | bufferlist&& bl, int fadvise_flags, | |
1001 | Context *on_finish) { | |
1002 | CephContext *cct = m_image_ctx.cct; | |
1003 | ||
1004 | ldout(cct, 20) << "aio_writesame" << dendl; | |
1005 | ||
1006 | utime_t now = ceph_clock_now(); | |
1007 | Extents ws_extents = {{offset, length}}; | |
1008 | m_perfcounter->inc(l_librbd_pwl_ws, 1); | |
1009 | ceph_assert(m_initialized); | |
1010 | ||
1011 | /* A write same request is also a write request. The key difference is the | |
1012 | * write same data buffer is shorter than the extent of the request. The full | |
1013 | * extent will be used in the block guard, and appear in | |
1014 | * m_blocks_to_log_entries_map. The data buffer allocated for the WS is only | |
1015 | * as long as the length of the bl here, which is the pattern that's repeated | |
1016 | * in the image for the entire length of this WS. Read hits and flushing of | |
1017 | * write sames are different than normal writes. */ | |
1018 | C_WriteSameRequestT *ws_req = | |
1019 | m_builder->create_writesame_request(*this, now, std::move(ws_extents), std::move(bl), | |
1020 | fadvise_flags, m_lock, m_perfcounter, on_finish); | |
1021 | m_perfcounter->inc(l_librbd_pwl_ws_bytes, ws_req->image_extents_summary.total_bytes); | |
1022 | ||
1023 | /* The lambda below will be called when the block guard for all | |
1024 | * blocks affected by this write is obtained */ | |
1025 | GuardedRequestFunctionContext *guarded_ctx = | |
1026 | new GuardedRequestFunctionContext([this, ws_req](GuardedRequestFunctionContext &guard_ctx) { | |
1027 | ws_req->blockguard_acquired(guard_ctx); | |
1028 | alloc_and_dispatch_io_req(ws_req); | |
1029 | }); | |
1030 | ||
1031 | detain_guarded_request(ws_req, guarded_ctx, false); | |
1032 | } | |
1033 | ||
1034 | template <typename I> | |
1035 | void AbstractWriteLog<I>::compare_and_write(Extents &&image_extents, | |
1036 | bufferlist&& cmp_bl, | |
1037 | bufferlist&& bl, | |
1038 | uint64_t *mismatch_offset, | |
1039 | int fadvise_flags, | |
1040 | Context *on_finish) { | |
1041 | ldout(m_image_ctx.cct, 20) << dendl; | |
1042 | ||
1043 | utime_t now = ceph_clock_now(); | |
1044 | m_perfcounter->inc(l_librbd_pwl_cmp, 1); | |
1045 | ceph_assert(m_initialized); | |
1046 | ||
1047 | /* A compare and write request is also a write request. We only allocate | |
1048 | * resources and dispatch this write request if the compare phase | |
1049 | * succeeds. */ | |
1050 | C_WriteRequestT *cw_req = | |
1051 | m_builder->create_comp_and_write_request( | |
1052 | *this, now, std::move(image_extents), std::move(cmp_bl), std::move(bl), | |
1053 | mismatch_offset, fadvise_flags, m_lock, m_perfcounter, on_finish); | |
1054 | m_perfcounter->inc(l_librbd_pwl_cmp_bytes, cw_req->image_extents_summary.total_bytes); | |
1055 | ||
1056 | /* The lambda below will be called when the block guard for all | |
1057 | * blocks affected by this write is obtained */ | |
1058 | GuardedRequestFunctionContext *guarded_ctx = | |
1059 | new GuardedRequestFunctionContext([this, cw_req](GuardedRequestFunctionContext &guard_ctx) { | |
1060 | cw_req->blockguard_acquired(guard_ctx); | |
1061 | ||
1062 | auto read_complete_ctx = new LambdaContext( | |
1063 | [this, cw_req](int r) { | |
1064 | ldout(m_image_ctx.cct, 20) << "name: " << m_image_ctx.name << " id: " << m_image_ctx.id | |
1065 | << "cw_req=" << cw_req << dendl; | |
1066 | ||
1067 | /* Compare read_bl to cmp_bl to determine if this will produce a write */ | |
1068 | buffer::list aligned_read_bl; | |
1069 | if (cw_req->cmp_bl.length() < cw_req->read_bl.length()) { | |
1070 | aligned_read_bl.substr_of(cw_req->read_bl, 0, cw_req->cmp_bl.length()); | |
1071 | } | |
1072 | if (cw_req->cmp_bl.contents_equal(cw_req->read_bl) || | |
1073 | cw_req->cmp_bl.contents_equal(aligned_read_bl)) { | |
1074 | /* Compare phase succeeds. Begin write */ | |
1075 | ldout(m_image_ctx.cct, 5) << " cw_req=" << cw_req << " compare matched" << dendl; | |
1076 | cw_req->compare_succeeded = true; | |
1077 | *cw_req->mismatch_offset = 0; | |
1078 | /* Continue with this request as a write. Blockguard release and | |
1079 | * user request completion handled as if this were a plain | |
1080 | * write. */ | |
1081 | alloc_and_dispatch_io_req(cw_req); | |
1082 | } else { | |
1083 | /* Compare phase fails. Comp-and write ends now. */ | |
1084 | ldout(m_image_ctx.cct, 15) << " cw_req=" << cw_req << " compare failed" << dendl; | |
1085 | /* Bufferlist doesn't tell us where they differed, so we'll have to determine that here */ | |
1086 | uint64_t bl_index = 0; | |
1087 | for (bl_index = 0; bl_index < cw_req->cmp_bl.length(); bl_index++) { | |
1088 | if (cw_req->cmp_bl[bl_index] != cw_req->read_bl[bl_index]) { | |
1089 | ldout(m_image_ctx.cct, 15) << " cw_req=" << cw_req << " mismatch at " << bl_index << dendl; | |
1090 | break; | |
1091 | } | |
1092 | } | |
1093 | cw_req->compare_succeeded = false; | |
1094 | *cw_req->mismatch_offset = bl_index; | |
1095 | cw_req->complete_user_request(-EILSEQ); | |
1096 | cw_req->release_cell(); | |
1097 | cw_req->complete(0); | |
1098 | } | |
1099 | }); | |
1100 | ||
1101 | /* Read phase of comp-and-write must read through RWL */ | |
1102 | Extents image_extents_copy = cw_req->image_extents; | |
1103 | read(std::move(image_extents_copy), &cw_req->read_bl, cw_req->fadvise_flags, read_complete_ctx); | |
1104 | }); | |
1105 | ||
1106 | detain_guarded_request(cw_req, guarded_ctx, false); | |
1107 | } | |
1108 | ||
1109 | template <typename I> | |
1110 | void AbstractWriteLog<I>::flush(Context *on_finish) { | |
1111 | internal_flush(false, on_finish); | |
1112 | } | |
1113 | ||
1114 | template <typename I> | |
1115 | void AbstractWriteLog<I>::invalidate(Context *on_finish) { | |
1116 | internal_flush(true, on_finish); | |
1117 | } | |
1118 | ||
1119 | template <typename I> | |
1120 | CephContext *AbstractWriteLog<I>::get_context() { | |
1121 | return m_image_ctx.cct; | |
1122 | } | |
1123 | ||
1124 | template <typename I> | |
1125 | BlockGuardCell* AbstractWriteLog<I>::detain_guarded_request_helper(GuardedRequest &req) | |
1126 | { | |
1127 | CephContext *cct = m_image_ctx.cct; | |
1128 | BlockGuardCell *cell; | |
1129 | ||
1130 | ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock)); | |
1131 | ldout(cct, 20) << dendl; | |
1132 | ||
1133 | int r = m_write_log_guard.detain(req.block_extent, &req, &cell); | |
1134 | ceph_assert(r>=0); | |
1135 | if (r > 0) { | |
1136 | ldout(cct, 20) << "detaining guarded request due to in-flight requests: " | |
1137 | << "req=" << req << dendl; | |
1138 | return nullptr; | |
1139 | } | |
1140 | ||
1141 | ldout(cct, 20) << "in-flight request cell: " << cell << dendl; | |
1142 | return cell; | |
1143 | } | |
1144 | ||
1145 | template <typename I> | |
1146 | BlockGuardCell* AbstractWriteLog<I>::detain_guarded_request_barrier_helper( | |
1147 | GuardedRequest &req) | |
1148 | { | |
1149 | BlockGuardCell *cell = nullptr; | |
1150 | ||
1151 | ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock)); | |
1152 | ldout(m_image_ctx.cct, 20) << dendl; | |
1153 | ||
1154 | if (m_barrier_in_progress) { | |
1155 | req.guard_ctx->state.queued = true; | |
1156 | m_awaiting_barrier.push_back(req); | |
1157 | } else { | |
1158 | bool barrier = req.guard_ctx->state.barrier; | |
1159 | if (barrier) { | |
1160 | m_barrier_in_progress = true; | |
1161 | req.guard_ctx->state.current_barrier = true; | |
1162 | } | |
1163 | cell = detain_guarded_request_helper(req); | |
1164 | if (barrier) { | |
1165 | /* Only non-null if the barrier acquires the guard now */ | |
1166 | m_barrier_cell = cell; | |
1167 | } | |
1168 | } | |
1169 | ||
1170 | return cell; | |
1171 | } | |
1172 | ||
1173 | template <typename I> | |
1174 | void AbstractWriteLog<I>::detain_guarded_request( | |
1175 | C_BlockIORequestT *request, | |
1176 | GuardedRequestFunctionContext *guarded_ctx, | |
1177 | bool is_barrier) | |
1178 | { | |
1179 | BlockExtent extent; | |
1180 | if (request) { | |
1181 | extent = request->image_extents_summary.block_extent(); | |
1182 | } else { | |
1183 | extent = block_extent(whole_volume_extent()); | |
1184 | } | |
1185 | auto req = GuardedRequest(extent, guarded_ctx, is_barrier); | |
1186 | BlockGuardCell *cell = nullptr; | |
1187 | ||
1188 | ldout(m_image_ctx.cct, 20) << dendl; | |
1189 | { | |
1190 | std::lock_guard locker(m_blockguard_lock); | |
1191 | cell = detain_guarded_request_barrier_helper(req); | |
1192 | } | |
1193 | if (cell) { | |
1194 | req.guard_ctx->cell = cell; | |
1195 | req.guard_ctx->complete(0); | |
1196 | } | |
1197 | } | |
1198 | ||
1199 | template <typename I> | |
1200 | void AbstractWriteLog<I>::release_guarded_request(BlockGuardCell *released_cell) | |
1201 | { | |
1202 | CephContext *cct = m_image_ctx.cct; | |
1203 | WriteLogGuard::BlockOperations block_reqs; | |
1204 | ldout(cct, 20) << "released_cell=" << released_cell << dendl; | |
1205 | ||
1206 | { | |
1207 | std::lock_guard locker(m_blockguard_lock); | |
1208 | m_write_log_guard.release(released_cell, &block_reqs); | |
1209 | ||
1210 | for (auto &req : block_reqs) { | |
1211 | req.guard_ctx->state.detained = true; | |
1212 | BlockGuardCell *detained_cell = detain_guarded_request_helper(req); | |
1213 | if (detained_cell) { | |
1214 | if (req.guard_ctx->state.current_barrier) { | |
1215 | /* The current barrier is acquiring the block guard, so now we know its cell */ | |
1216 | m_barrier_cell = detained_cell; | |
1217 | /* detained_cell could be == released_cell here */ | |
1218 | ldout(cct, 20) << "current barrier cell=" << detained_cell << " req=" << req << dendl; | |
1219 | } | |
1220 | req.guard_ctx->cell = detained_cell; | |
1221 | m_work_queue.queue(req.guard_ctx); | |
1222 | } | |
1223 | } | |
1224 | ||
1225 | if (m_barrier_in_progress && (released_cell == m_barrier_cell)) { | |
1226 | ldout(cct, 20) << "current barrier released cell=" << released_cell << dendl; | |
1227 | /* The released cell is the current barrier request */ | |
1228 | m_barrier_in_progress = false; | |
1229 | m_barrier_cell = nullptr; | |
1230 | /* Move waiting requests into the blockguard. Stop if there's another barrier */ | |
1231 | while (!m_barrier_in_progress && !m_awaiting_barrier.empty()) { | |
1232 | auto &req = m_awaiting_barrier.front(); | |
1233 | ldout(cct, 20) << "submitting queued request to blockguard: " << req << dendl; | |
1234 | BlockGuardCell *detained_cell = detain_guarded_request_barrier_helper(req); | |
1235 | if (detained_cell) { | |
1236 | req.guard_ctx->cell = detained_cell; | |
1237 | m_work_queue.queue(req.guard_ctx); | |
1238 | } | |
1239 | m_awaiting_barrier.pop_front(); | |
1240 | } | |
1241 | } | |
1242 | } | |
1243 | ||
1244 | ldout(cct, 20) << "exit" << dendl; | |
1245 | } | |
1246 | ||
1247 | template <typename I> | |
1248 | void AbstractWriteLog<I>::append_scheduled(GenericLogOperations &ops, bool &ops_remain, | |
1249 | bool &appending, bool isRWL) | |
1250 | { | |
1251 | const unsigned long int OPS_APPENDED = isRWL ? MAX_ALLOC_PER_TRANSACTION | |
1252 | : MAX_WRITES_PER_SYNC_POINT; | |
1253 | { | |
1254 | std::lock_guard locker(m_lock); | |
1255 | if (!appending && m_appending) { | |
1256 | /* Another thread is appending */ | |
1257 | ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl; | |
1258 | return; | |
1259 | } | |
1260 | if (m_ops_to_append.size()) { | |
1261 | appending = true; | |
1262 | m_appending = true; | |
1263 | auto last_in_batch = m_ops_to_append.begin(); | |
1264 | unsigned int ops_to_append = m_ops_to_append.size(); | |
1265 | if (ops_to_append > OPS_APPENDED) { | |
1266 | ops_to_append = OPS_APPENDED; | |
1267 | } | |
1268 | std::advance(last_in_batch, ops_to_append); | |
1269 | ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch); | |
1270 | ops_remain = true; /* Always check again before leaving */ | |
20effc67 TL |
1271 | ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", remain " |
1272 | << m_ops_to_append.size() << dendl; | |
f67539c2 TL |
1273 | } else if (isRWL) { |
1274 | ops_remain = false; | |
1275 | if (appending) { | |
1276 | appending = false; | |
1277 | m_appending = false; | |
1278 | } | |
1279 | } | |
1280 | } | |
1281 | } | |
1282 | ||
1283 | template <typename I> | |
a4b75251 | 1284 | void AbstractWriteLog<I>::schedule_append(GenericLogOperationsVector &ops, C_BlockIORequestT *req) |
f67539c2 TL |
1285 | { |
1286 | GenericLogOperations to_append(ops.begin(), ops.end()); | |
1287 | ||
a4b75251 | 1288 | schedule_append_ops(to_append, req); |
f67539c2 TL |
1289 | } |
1290 | ||
1291 | template <typename I> | |
a4b75251 | 1292 | void AbstractWriteLog<I>::schedule_append(GenericLogOperationSharedPtr op, C_BlockIORequestT *req) |
f67539c2 TL |
1293 | { |
1294 | GenericLogOperations to_append { op }; | |
1295 | ||
a4b75251 | 1296 | schedule_append_ops(to_append, req); |
f67539c2 TL |
1297 | } |
1298 | ||
1299 | /* | |
1300 | * Complete a set of write ops with the result of append_op_entries. | |
1301 | */ | |
1302 | template <typename I> | |
1303 | void AbstractWriteLog<I>::complete_op_log_entries(GenericLogOperations &&ops, | |
1304 | const int result) | |
1305 | { | |
1306 | GenericLogEntries dirty_entries; | |
1307 | int published_reserves = 0; | |
1308 | ldout(m_image_ctx.cct, 20) << __func__ << ": completing" << dendl; | |
1309 | for (auto &op : ops) { | |
1310 | utime_t now = ceph_clock_now(); | |
1311 | auto log_entry = op->get_log_entry(); | |
1312 | log_entry->completed = true; | |
1313 | if (op->is_writing_op()) { | |
1314 | op->mark_log_entry_completed(); | |
1315 | dirty_entries.push_back(log_entry); | |
1316 | } | |
1317 | if (log_entry->is_write_entry()) { | |
1318 | release_ram(log_entry); | |
1319 | } | |
1320 | if (op->reserved_allocated()) { | |
1321 | published_reserves++; | |
1322 | } | |
1323 | { | |
1324 | std::lock_guard locker(m_lock); | |
1325 | m_unpublished_reserves -= published_reserves; | |
1326 | m_dirty_log_entries.splice(m_dirty_log_entries.end(), dirty_entries); | |
33c7a0ef TL |
1327 | if (m_cache_state->clean && !this->m_dirty_log_entries.empty()) { |
1328 | m_cache_state->clean = false; | |
1329 | update_image_cache_state(); | |
1330 | } | |
f67539c2 TL |
1331 | } |
1332 | op->complete(result); | |
1333 | m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_app_t, | |
a4b75251 | 1334 | op->log_append_start_time - op->dispatch_time); |
f67539c2 TL |
1335 | m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_cmp_t, now - op->dispatch_time); |
1336 | m_perfcounter->hinc(l_librbd_pwl_log_op_dis_to_cmp_t_hist, | |
1337 | utime_t(now - op->dispatch_time).to_nsec(), | |
1338 | log_entry->ram_entry.write_bytes); | |
a4b75251 | 1339 | utime_t app_lat = op->log_append_comp_time - op->log_append_start_time; |
f67539c2 TL |
1340 | m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_appc_t, app_lat); |
1341 | m_perfcounter->hinc(l_librbd_pwl_log_op_app_to_appc_t_hist, app_lat.to_nsec(), | |
1342 | log_entry->ram_entry.write_bytes); | |
a4b75251 | 1343 | m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_cmp_t, now - op->log_append_start_time); |
f67539c2 TL |
1344 | } |
1345 | // New entries may be flushable | |
1346 | { | |
1347 | std::lock_guard locker(m_lock); | |
1348 | wake_up(); | |
1349 | } | |
1350 | } | |
1351 | ||
1352 | /** | |
1353 | * Dispatch as many deferred writes as possible | |
1354 | */ | |
1355 | template <typename I> | |
1356 | void AbstractWriteLog<I>::dispatch_deferred_writes(void) | |
1357 | { | |
1358 | C_BlockIORequestT *front_req = nullptr; /* req still on front of deferred list */ | |
1359 | C_BlockIORequestT *allocated_req = nullptr; /* req that was allocated, and is now off the list */ | |
1360 | bool allocated = false; /* front_req allocate succeeded */ | |
1361 | bool cleared_dispatching_flag = false; | |
1362 | ||
1363 | /* If we can't become the dispatcher, we'll exit */ | |
1364 | { | |
1365 | std::lock_guard locker(m_lock); | |
1366 | if (m_dispatching_deferred_ops || | |
1367 | !m_deferred_ios.size()) { | |
1368 | return; | |
1369 | } | |
1370 | m_dispatching_deferred_ops = true; | |
1371 | } | |
1372 | ||
1373 | /* There are ops to dispatch, and this should be the only thread dispatching them */ | |
1374 | { | |
1375 | std::lock_guard deferred_dispatch(m_deferred_dispatch_lock); | |
1376 | do { | |
1377 | { | |
1378 | std::lock_guard locker(m_lock); | |
1379 | ceph_assert(m_dispatching_deferred_ops); | |
1380 | if (allocated) { | |
1381 | /* On the 2..n-1 th time we get lock, front_req->alloc_resources() will | |
1382 | * have succeeded, and we'll need to pop it off the deferred ops list | |
1383 | * here. */ | |
1384 | ceph_assert(front_req); | |
1385 | ceph_assert(!allocated_req); | |
1386 | m_deferred_ios.pop_front(); | |
1387 | allocated_req = front_req; | |
1388 | front_req = nullptr; | |
1389 | allocated = false; | |
1390 | } | |
1391 | ceph_assert(!allocated); | |
1392 | if (!allocated && front_req) { | |
1393 | /* front_req->alloc_resources() failed on the last iteration. | |
1394 | * We'll stop dispatching. */ | |
1395 | wake_up(); | |
1396 | front_req = nullptr; | |
1397 | ceph_assert(!cleared_dispatching_flag); | |
1398 | m_dispatching_deferred_ops = false; | |
1399 | cleared_dispatching_flag = true; | |
1400 | } else { | |
1401 | ceph_assert(!front_req); | |
1402 | if (m_deferred_ios.size()) { | |
1403 | /* New allocation candidate */ | |
1404 | front_req = m_deferred_ios.front(); | |
1405 | } else { | |
1406 | ceph_assert(!cleared_dispatching_flag); | |
1407 | m_dispatching_deferred_ops = false; | |
1408 | cleared_dispatching_flag = true; | |
1409 | } | |
1410 | } | |
1411 | } | |
1412 | /* Try allocating for front_req before we decide what to do with allocated_req | |
1413 | * (if any) */ | |
1414 | if (front_req) { | |
1415 | ceph_assert(!cleared_dispatching_flag); | |
1416 | allocated = front_req->alloc_resources(); | |
1417 | } | |
1418 | if (allocated_req && front_req && allocated) { | |
1419 | /* Push dispatch of the first allocated req to a wq */ | |
1420 | m_work_queue.queue(new LambdaContext( | |
33c7a0ef | 1421 | [allocated_req](int r) { |
f67539c2 TL |
1422 | allocated_req->dispatch(); |
1423 | }), 0); | |
1424 | allocated_req = nullptr; | |
1425 | } | |
1426 | ceph_assert(!(allocated_req && front_req && allocated)); | |
1427 | ||
1428 | /* Continue while we're still considering the front of the deferred ops list */ | |
1429 | } while (front_req); | |
1430 | ceph_assert(!allocated); | |
1431 | } | |
1432 | ceph_assert(cleared_dispatching_flag); | |
1433 | ||
1434 | /* If any deferred requests were allocated, the last one will still be in allocated_req */ | |
1435 | if (allocated_req) { | |
1436 | allocated_req->dispatch(); | |
1437 | } | |
1438 | } | |
1439 | ||
1440 | /** | |
1441 | * Returns the lanes used by this write, and attempts to dispatch the next | |
1442 | * deferred write | |
1443 | */ | |
1444 | template <typename I> | |
1445 | void AbstractWriteLog<I>::release_write_lanes(C_BlockIORequestT *req) | |
1446 | { | |
1447 | { | |
1448 | std::lock_guard locker(m_lock); | |
1449 | m_free_lanes += req->image_extents.size(); | |
1450 | } | |
1451 | dispatch_deferred_writes(); | |
1452 | } | |
1453 | ||
1454 | /** | |
1455 | * Attempts to allocate log resources for a write. Write is dispatched if | |
1456 | * resources are available, or queued if they aren't. | |
1457 | */ | |
1458 | template <typename I> | |
1459 | void AbstractWriteLog<I>::alloc_and_dispatch_io_req(C_BlockIORequestT *req) | |
1460 | { | |
1461 | bool dispatch_here = false; | |
1462 | ||
1463 | { | |
1464 | /* If there are already deferred writes, queue behind them for resources */ | |
1465 | { | |
1466 | std::lock_guard locker(m_lock); | |
1467 | dispatch_here = m_deferred_ios.empty(); | |
1468 | // Only flush req's total_bytes is the max uint64 | |
a4b75251 TL |
1469 | if (req->image_extents_summary.total_bytes == |
1470 | std::numeric_limits<uint64_t>::max() && | |
1471 | static_cast<C_FlushRequestT *>(req)->internal == true) { | |
f67539c2 TL |
1472 | dispatch_here = true; |
1473 | } | |
1474 | } | |
1475 | if (dispatch_here) { | |
1476 | dispatch_here = req->alloc_resources(); | |
1477 | } | |
1478 | if (dispatch_here) { | |
1479 | ldout(m_image_ctx.cct, 20) << "dispatching" << dendl; | |
1480 | req->dispatch(); | |
1481 | } else { | |
1482 | req->deferred(); | |
1483 | { | |
1484 | std::lock_guard locker(m_lock); | |
1485 | m_deferred_ios.push_back(req); | |
1486 | } | |
1487 | ldout(m_image_ctx.cct, 20) << "deferred IOs: " << m_deferred_ios.size() << dendl; | |
1488 | dispatch_deferred_writes(); | |
1489 | } | |
1490 | } | |
1491 | } | |
1492 | ||
1493 | template <typename I> | |
a4b75251 TL |
1494 | bool AbstractWriteLog<I>::check_allocation( |
1495 | C_BlockIORequestT *req, uint64_t bytes_cached, uint64_t bytes_dirtied, | |
1496 | uint64_t bytes_allocated, uint32_t num_lanes, uint32_t num_log_entries, | |
1497 | uint32_t num_unpublished_reserves) { | |
f67539c2 TL |
1498 | bool alloc_succeeds = true; |
1499 | bool no_space = false; | |
1500 | { | |
1501 | std::lock_guard locker(m_lock); | |
1502 | if (m_free_lanes < num_lanes) { | |
f67539c2 TL |
1503 | ldout(m_image_ctx.cct, 20) << "not enough free lanes (need " |
1504 | << num_lanes | |
1505 | << ", have " << m_free_lanes << ") " | |
1506 | << *req << dendl; | |
1507 | alloc_succeeds = false; | |
1508 | /* This isn't considered a "no space" alloc fail. Lanes are a throttling mechanism. */ | |
1509 | } | |
1510 | if (m_free_log_entries < num_log_entries) { | |
f67539c2 TL |
1511 | ldout(m_image_ctx.cct, 20) << "not enough free entries (need " |
1512 | << num_log_entries | |
1513 | << ", have " << m_free_log_entries << ") " | |
1514 | << *req << dendl; | |
1515 | alloc_succeeds = false; | |
1516 | no_space = true; /* Entries must be retired */ | |
1517 | } | |
1518 | /* Don't attempt buffer allocate if we've exceeded the "full" threshold */ | |
a4b75251 | 1519 | if (m_bytes_allocated + bytes_allocated > m_bytes_allocated_cap) { |
20effc67 TL |
1520 | ldout(m_image_ctx.cct, 20) << "Waiting for allocation cap (cap=" |
1521 | << m_bytes_allocated_cap | |
1522 | << ", allocated=" << m_bytes_allocated | |
1523 | << ") in write [" << *req << "]" << dendl; | |
f67539c2 TL |
1524 | alloc_succeeds = false; |
1525 | no_space = true; /* Entries must be retired */ | |
1526 | } | |
1527 | } | |
1528 | ||
1529 | if (alloc_succeeds) { | |
1530 | reserve_cache(req, alloc_succeeds, no_space); | |
1531 | } | |
1532 | ||
1533 | if (alloc_succeeds) { | |
1534 | std::lock_guard locker(m_lock); | |
1535 | /* We need one free log entry per extent (each is a separate entry), and | |
1536 | * one free "lane" for remote replication. */ | |
1537 | if ((m_free_lanes >= num_lanes) && | |
a4b75251 TL |
1538 | (m_free_log_entries >= num_log_entries) && |
1539 | (m_bytes_allocated_cap >= m_bytes_allocated + bytes_allocated)) { | |
f67539c2 TL |
1540 | m_free_lanes -= num_lanes; |
1541 | m_free_log_entries -= num_log_entries; | |
1542 | m_unpublished_reserves += num_unpublished_reserves; | |
1543 | m_bytes_allocated += bytes_allocated; | |
1544 | m_bytes_cached += bytes_cached; | |
1545 | m_bytes_dirty += bytes_dirtied; | |
f67539c2 TL |
1546 | } else { |
1547 | alloc_succeeds = false; | |
1548 | } | |
1549 | } | |
1550 | ||
1551 | if (!alloc_succeeds && no_space) { | |
1552 | /* Expedite flushing and/or retiring */ | |
1553 | std::lock_guard locker(m_lock); | |
1554 | m_alloc_failed_since_retire = true; | |
1555 | m_last_alloc_fail = ceph_clock_now(); | |
1556 | } | |
1557 | ||
1558 | return alloc_succeeds; | |
1559 | } | |
1560 | ||
1561 | template <typename I> | |
1562 | C_FlushRequest<AbstractWriteLog<I>>* AbstractWriteLog<I>::make_flush_req(Context *on_finish) { | |
1563 | utime_t flush_begins = ceph_clock_now(); | |
1564 | bufferlist bl; | |
1565 | auto *flush_req = | |
1566 | new C_FlushRequestT(*this, flush_begins, Extents({whole_volume_extent()}), | |
1567 | std::move(bl), 0, m_lock, m_perfcounter, on_finish); | |
1568 | ||
1569 | return flush_req; | |
1570 | } | |
1571 | ||
1572 | template <typename I> | |
1573 | void AbstractWriteLog<I>::wake_up() { | |
1574 | CephContext *cct = m_image_ctx.cct; | |
1575 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1576 | ||
1577 | if (!m_wake_up_enabled) { | |
1578 | // wake_up is disabled during shutdown after flushing completes | |
1579 | ldout(m_image_ctx.cct, 6) << "deferred processing disabled" << dendl; | |
1580 | return; | |
1581 | } | |
1582 | ||
1583 | if (m_wake_up_requested && m_wake_up_scheduled) { | |
1584 | return; | |
1585 | } | |
1586 | ||
1587 | ldout(cct, 20) << dendl; | |
1588 | ||
1589 | /* Wake-up can be requested while it's already scheduled */ | |
1590 | m_wake_up_requested = true; | |
1591 | ||
1592 | /* Wake-up cannot be scheduled if it's already scheduled */ | |
1593 | if (m_wake_up_scheduled) { | |
1594 | return; | |
1595 | } | |
1596 | m_wake_up_scheduled = true; | |
1597 | m_async_process_work++; | |
1598 | m_async_op_tracker.start_op(); | |
1599 | m_work_queue.queue(new LambdaContext( | |
1600 | [this](int r) { | |
1601 | process_work(); | |
1602 | m_async_op_tracker.finish_op(); | |
1603 | m_async_process_work--; | |
1604 | }), 0); | |
1605 | } | |
1606 | ||
1607 | template <typename I> | |
1608 | bool AbstractWriteLog<I>::can_flush_entry(std::shared_ptr<GenericLogEntry> log_entry) { | |
1609 | CephContext *cct = m_image_ctx.cct; | |
1610 | ||
1611 | ldout(cct, 20) << "" << dendl; | |
1612 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1613 | ||
1614 | if (m_invalidating) { | |
1615 | return true; | |
1616 | } | |
1617 | ||
1618 | /* For OWB we can flush entries with the same sync gen number (write between | |
1619 | * aio_flush() calls) concurrently. Here we'll consider an entry flushable if | |
1620 | * its sync gen number is <= the lowest sync gen number carried by all the | |
1621 | * entries currently flushing. | |
1622 | * | |
1623 | * If the entry considered here bears a sync gen number lower than a | |
1624 | * previously flushed entry, the application had to have submitted the write | |
1625 | * bearing the higher gen number before the write with the lower gen number | |
1626 | * completed. So, flushing these concurrently is OK. | |
1627 | * | |
1628 | * If the entry considered here bears a sync gen number higher than a | |
1629 | * currently flushing entry, the write with the lower gen number may have | |
1630 | * completed to the application before the write with the higher sync gen | |
1631 | * number was submitted, and the application may rely on that completion | |
1632 | * order for volume consistency. In this case the entry will not be | |
1633 | * considered flushable until all the entries bearing lower sync gen numbers | |
1634 | * finish flushing. | |
1635 | */ | |
1636 | ||
1637 | if (m_flush_ops_in_flight && | |
1638 | (log_entry->ram_entry.sync_gen_number > m_lowest_flushing_sync_gen)) { | |
1639 | return false; | |
1640 | } | |
1641 | ||
1642 | return (log_entry->can_writeback() && | |
1643 | (m_flush_ops_in_flight <= IN_FLIGHT_FLUSH_WRITE_LIMIT) && | |
1644 | (m_flush_bytes_in_flight <= IN_FLIGHT_FLUSH_BYTES_LIMIT)); | |
1645 | } | |
1646 | ||
1647 | template <typename I> | |
20effc67 TL |
1648 | void AbstractWriteLog<I>::detain_flush_guard_request(std::shared_ptr<GenericLogEntry> log_entry, |
1649 | GuardedRequestFunctionContext *guarded_ctx) { | |
1650 | ldout(m_image_ctx.cct, 20) << dendl; | |
f67539c2 | 1651 | |
20effc67 TL |
1652 | BlockExtent extent; |
1653 | if (log_entry->is_sync_point()) { | |
1654 | extent = block_extent(whole_volume_extent()); | |
1655 | } else { | |
1656 | extent = log_entry->ram_entry.block_extent(); | |
f67539c2 | 1657 | } |
20effc67 TL |
1658 | |
1659 | auto req = GuardedRequest(extent, guarded_ctx, false); | |
1660 | BlockGuardCell *cell = nullptr; | |
1661 | ||
1662 | { | |
1663 | std::lock_guard locker(m_flush_guard_lock); | |
1664 | m_flush_guard.detain(req.block_extent, &req, &cell); | |
1665 | } | |
1666 | if (cell) { | |
1667 | req.guard_ctx->cell = cell; | |
1668 | m_image_ctx.op_work_queue->queue(req.guard_ctx, 0); | |
1669 | } | |
1670 | } | |
1671 | ||
1672 | template <typename I> | |
1673 | Context* AbstractWriteLog<I>::construct_flush_entry(std::shared_ptr<GenericLogEntry> log_entry, | |
1674 | bool invalidating) { | |
1675 | ldout(m_image_ctx.cct, 20) << "" << dendl; | |
f67539c2 TL |
1676 | |
1677 | /* Flush write completion action */ | |
a4b75251 | 1678 | utime_t writeback_start_time = ceph_clock_now(); |
f67539c2 | 1679 | Context *ctx = new LambdaContext( |
a4b75251 TL |
1680 | [this, log_entry, writeback_start_time, invalidating](int r) { |
1681 | utime_t writeback_comp_time = ceph_clock_now(); | |
1682 | m_perfcounter->tinc(l_librbd_pwl_writeback_latency, | |
1683 | writeback_comp_time - writeback_start_time); | |
f67539c2 TL |
1684 | { |
1685 | std::lock_guard locker(m_lock); | |
1686 | if (r < 0) { | |
1687 | lderr(m_image_ctx.cct) << "failed to flush log entry" | |
1688 | << cpp_strerror(r) << dendl; | |
1689 | m_dirty_log_entries.push_front(log_entry); | |
1690 | } else { | |
1691 | ceph_assert(m_bytes_dirty >= log_entry->bytes_dirty()); | |
1692 | log_entry->set_flushed(true); | |
1693 | m_bytes_dirty -= log_entry->bytes_dirty(); | |
1694 | sync_point_writer_flushed(log_entry->get_sync_point_entry()); | |
1695 | ldout(m_image_ctx.cct, 20) << "flushed: " << log_entry | |
1696 | << " invalidating=" << invalidating | |
1697 | << dendl; | |
1698 | } | |
1699 | m_flush_ops_in_flight -= 1; | |
1700 | m_flush_bytes_in_flight -= log_entry->ram_entry.write_bytes; | |
1701 | wake_up(); | |
1702 | } | |
1703 | }); | |
1704 | /* Flush through lower cache before completing */ | |
1705 | ctx = new LambdaContext( | |
20effc67 TL |
1706 | [this, ctx, log_entry](int r) { |
1707 | { | |
1708 | ||
1709 | WriteLogGuard::BlockOperations block_reqs; | |
1710 | BlockGuardCell *detained_cell = nullptr; | |
1711 | ||
1712 | std::lock_guard locker{m_flush_guard_lock}; | |
1713 | m_flush_guard.release(log_entry->m_cell, &block_reqs); | |
1714 | ||
1715 | for (auto &req : block_reqs) { | |
1716 | m_flush_guard.detain(req.block_extent, &req, &detained_cell); | |
1717 | if (detained_cell) { | |
1718 | req.guard_ctx->cell = detained_cell; | |
1719 | m_image_ctx.op_work_queue->queue(req.guard_ctx, 0); | |
1720 | } | |
1721 | } | |
1722 | } | |
1723 | ||
f67539c2 TL |
1724 | if (r < 0) { |
1725 | lderr(m_image_ctx.cct) << "failed to flush log entry" | |
1726 | << cpp_strerror(r) << dendl; | |
1727 | ctx->complete(r); | |
1728 | } else { | |
1729 | m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, ctx); | |
1730 | } | |
1731 | }); | |
1732 | return ctx; | |
1733 | } | |
1734 | ||
1735 | template <typename I> | |
1736 | void AbstractWriteLog<I>::process_writeback_dirty_entries() { | |
1737 | CephContext *cct = m_image_ctx.cct; | |
1738 | bool all_clean = false; | |
1739 | int flushed = 0; | |
a4b75251 | 1740 | bool has_write_entry = false; |
f67539c2 TL |
1741 | |
1742 | ldout(cct, 20) << "Look for dirty entries" << dendl; | |
1743 | { | |
1744 | DeferredContexts post_unlock; | |
a4b75251 TL |
1745 | GenericLogEntries entries_to_flush; |
1746 | ||
f67539c2 | 1747 | std::shared_lock entry_reader_locker(m_entry_reader_lock); |
a4b75251 | 1748 | std::lock_guard locker(m_lock); |
f67539c2 | 1749 | while (flushed < IN_FLIGHT_FLUSH_WRITE_LIMIT) { |
f67539c2 TL |
1750 | if (m_shutting_down) { |
1751 | ldout(cct, 5) << "Flush during shutdown supressed" << dendl; | |
1752 | /* Do flush complete only when all flush ops are finished */ | |
1753 | all_clean = !m_flush_ops_in_flight; | |
1754 | break; | |
1755 | } | |
1756 | if (m_dirty_log_entries.empty()) { | |
1757 | ldout(cct, 20) << "Nothing new to flush" << dendl; | |
1758 | /* Do flush complete only when all flush ops are finished */ | |
1759 | all_clean = !m_flush_ops_in_flight; | |
33c7a0ef TL |
1760 | if (!m_cache_state->clean && all_clean) { |
1761 | m_cache_state->clean = true; | |
1762 | update_image_cache_state(); | |
1763 | } | |
f67539c2 TL |
1764 | break; |
1765 | } | |
a4b75251 | 1766 | |
f67539c2 TL |
1767 | auto candidate = m_dirty_log_entries.front(); |
1768 | bool flushable = can_flush_entry(candidate); | |
1769 | if (flushable) { | |
a4b75251 | 1770 | entries_to_flush.push_back(candidate); |
f67539c2 | 1771 | flushed++; |
a4b75251 TL |
1772 | if (!has_write_entry) |
1773 | has_write_entry = candidate->is_write_entry(); | |
f67539c2 | 1774 | m_dirty_log_entries.pop_front(); |
20effc67 TL |
1775 | |
1776 | // To track candidate, we should add m_flush_ops_in_flight in here | |
1777 | { | |
1778 | if (!m_flush_ops_in_flight || | |
1779 | (candidate->ram_entry.sync_gen_number < m_lowest_flushing_sync_gen)) { | |
1780 | m_lowest_flushing_sync_gen = candidate->ram_entry.sync_gen_number; | |
1781 | } | |
1782 | m_flush_ops_in_flight += 1; | |
1783 | /* For write same this is the bytes affected by the flush op, not the bytes transferred */ | |
1784 | m_flush_bytes_in_flight += candidate->ram_entry.write_bytes; | |
1785 | } | |
f67539c2 TL |
1786 | } else { |
1787 | ldout(cct, 20) << "Next dirty entry isn't flushable yet" << dendl; | |
1788 | break; | |
1789 | } | |
1790 | } | |
a4b75251 TL |
1791 | |
1792 | construct_flush_entries(entries_to_flush, post_unlock, has_write_entry); | |
f67539c2 TL |
1793 | } |
1794 | ||
1795 | if (all_clean) { | |
1796 | /* All flushing complete, drain outside lock */ | |
1797 | Contexts flush_contexts; | |
1798 | { | |
1799 | std::lock_guard locker(m_lock); | |
1800 | flush_contexts.swap(m_flush_complete_contexts); | |
1801 | } | |
1802 | finish_contexts(m_image_ctx.cct, flush_contexts, 0); | |
1803 | } | |
1804 | } | |
1805 | ||
1806 | /* Returns true if the specified SyncPointLogEntry is considered flushed, and | |
1807 | * the log will be updated to reflect this. */ | |
1808 | template <typename I> | |
1809 | bool AbstractWriteLog<I>::handle_flushed_sync_point(std::shared_ptr<SyncPointLogEntry> log_entry) | |
1810 | { | |
1811 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1812 | ceph_assert(log_entry); | |
1813 | ||
1814 | if ((log_entry->writes_flushed == log_entry->writes) && | |
1815 | log_entry->completed && log_entry->prior_sync_point_flushed && | |
1816 | log_entry->next_sync_point_entry) { | |
1817 | ldout(m_image_ctx.cct, 20) << "All writes flushed up to sync point=" | |
1818 | << *log_entry << dendl; | |
1819 | log_entry->next_sync_point_entry->prior_sync_point_flushed = true; | |
1820 | /* Don't move the flushed sync gen num backwards. */ | |
1821 | if (m_flushed_sync_gen < log_entry->ram_entry.sync_gen_number) { | |
1822 | m_flushed_sync_gen = log_entry->ram_entry.sync_gen_number; | |
1823 | } | |
1824 | m_async_op_tracker.start_op(); | |
1825 | m_work_queue.queue(new LambdaContext( | |
a4b75251 | 1826 | [this, next = std::move(log_entry->next_sync_point_entry)](int r) { |
f67539c2 TL |
1827 | bool handled_by_next; |
1828 | { | |
1829 | std::lock_guard locker(m_lock); | |
a4b75251 | 1830 | handled_by_next = handle_flushed_sync_point(std::move(next)); |
f67539c2 TL |
1831 | } |
1832 | if (!handled_by_next) { | |
1833 | persist_last_flushed_sync_gen(); | |
1834 | } | |
1835 | m_async_op_tracker.finish_op(); | |
1836 | })); | |
1837 | return true; | |
1838 | } | |
1839 | return false; | |
1840 | } | |
1841 | ||
1842 | template <typename I> | |
1843 | void AbstractWriteLog<I>::sync_point_writer_flushed(std::shared_ptr<SyncPointLogEntry> log_entry) | |
1844 | { | |
1845 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1846 | ceph_assert(log_entry); | |
1847 | log_entry->writes_flushed++; | |
1848 | ||
1849 | /* If this entry might be completely flushed, look closer */ | |
1850 | if ((log_entry->writes_flushed == log_entry->writes) && log_entry->completed) { | |
1851 | ldout(m_image_ctx.cct, 15) << "All writes flushed for sync point=" | |
1852 | << *log_entry << dendl; | |
1853 | handle_flushed_sync_point(log_entry); | |
1854 | } | |
1855 | } | |
1856 | ||
1857 | /* Make a new sync point and flush the previous during initialization, when there may or may | |
1858 | * not be a previous sync point */ | |
1859 | template <typename I> | |
1860 | void AbstractWriteLog<I>::init_flush_new_sync_point(DeferredContexts &later) { | |
1861 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1862 | ceph_assert(!m_initialized); /* Don't use this after init */ | |
1863 | ||
1864 | if (!m_current_sync_point) { | |
1865 | /* First sync point since start */ | |
1866 | new_sync_point(later); | |
1867 | } else { | |
1868 | flush_new_sync_point(nullptr, later); | |
1869 | } | |
1870 | } | |
1871 | ||
1872 | /** | |
1873 | * Begin a new sync point | |
1874 | */ | |
1875 | template <typename I> | |
1876 | void AbstractWriteLog<I>::new_sync_point(DeferredContexts &later) { | |
1877 | CephContext *cct = m_image_ctx.cct; | |
1878 | std::shared_ptr<SyncPoint> old_sync_point = m_current_sync_point; | |
1879 | std::shared_ptr<SyncPoint> new_sync_point; | |
1880 | ldout(cct, 20) << dendl; | |
1881 | ||
1882 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1883 | ||
1884 | /* The first time this is called, if this is a newly created log, | |
1885 | * this makes the first sync gen number we'll use 1. On the first | |
1886 | * call for a re-opened log m_current_sync_gen will be the highest | |
1887 | * gen number from all the sync point entries found in the re-opened | |
1888 | * log, and this advances to the next sync gen number. */ | |
1889 | ++m_current_sync_gen; | |
1890 | ||
1891 | new_sync_point = std::make_shared<SyncPoint>(m_current_sync_gen, cct); | |
1892 | m_current_sync_point = new_sync_point; | |
1893 | ||
1894 | /* If this log has been re-opened, old_sync_point will initially be | |
1895 | * nullptr, but m_current_sync_gen may not be zero. */ | |
1896 | if (old_sync_point) { | |
1897 | new_sync_point->setup_earlier_sync_point(old_sync_point, m_last_op_sequence_num); | |
1898 | m_perfcounter->hinc(l_librbd_pwl_syncpoint_hist, | |
1899 | old_sync_point->log_entry->writes, | |
1900 | old_sync_point->log_entry->bytes); | |
1901 | /* This sync point will acquire no more sub-ops. Activation needs | |
1902 | * to acquire m_lock, so defer to later*/ | |
1903 | later.add(new LambdaContext( | |
33c7a0ef | 1904 | [old_sync_point](int r) { |
f67539c2 TL |
1905 | old_sync_point->prior_persisted_gather_activate(); |
1906 | })); | |
1907 | } | |
1908 | ||
1909 | new_sync_point->prior_persisted_gather_set_finisher(); | |
1910 | ||
1911 | if (old_sync_point) { | |
1912 | ldout(cct,6) << "new sync point = [" << *m_current_sync_point | |
1913 | << "], prior = [" << *old_sync_point << "]" << dendl; | |
1914 | } else { | |
1915 | ldout(cct,6) << "first sync point = [" << *m_current_sync_point | |
1916 | << "]" << dendl; | |
1917 | } | |
1918 | } | |
1919 | ||
1920 | template <typename I> | |
1921 | void AbstractWriteLog<I>::flush_new_sync_point(C_FlushRequestT *flush_req, | |
1922 | DeferredContexts &later) { | |
1923 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1924 | ||
1925 | if (!flush_req) { | |
1926 | m_async_null_flush_finish++; | |
1927 | m_async_op_tracker.start_op(); | |
1928 | Context *flush_ctx = new LambdaContext([this](int r) { | |
1929 | m_async_null_flush_finish--; | |
1930 | m_async_op_tracker.finish_op(); | |
1931 | }); | |
1932 | flush_req = make_flush_req(flush_ctx); | |
1933 | flush_req->internal = true; | |
1934 | } | |
1935 | ||
1936 | /* Add a new sync point. */ | |
1937 | new_sync_point(later); | |
1938 | std::shared_ptr<SyncPoint> to_append = m_current_sync_point->earlier_sync_point; | |
1939 | ceph_assert(to_append); | |
1940 | ||
1941 | /* This flush request will append/persist the (now) previous sync point */ | |
1942 | flush_req->to_append = to_append; | |
1943 | ||
1944 | /* When the m_sync_point_persist Gather completes this sync point can be | |
1945 | * appended. The only sub for this Gather is the finisher Context for | |
1946 | * m_prior_log_entries_persisted, which records the result of the Gather in | |
1947 | * the sync point, and completes. TODO: Do we still need both of these | |
1948 | * Gathers?*/ | |
1949 | Context * ctx = new LambdaContext([this, flush_req](int r) { | |
1950 | ldout(m_image_ctx.cct, 20) << "Flush req=" << flush_req | |
1951 | << " sync point =" << flush_req->to_append | |
1952 | << ". Ready to persist." << dendl; | |
1953 | alloc_and_dispatch_io_req(flush_req); | |
1954 | }); | |
1955 | to_append->persist_gather_set_finisher(ctx); | |
1956 | ||
1957 | /* The m_sync_point_persist Gather has all the subs it will ever have, and | |
1958 | * now has its finisher. If the sub is already complete, activation will | |
1959 | * complete the Gather. The finisher will acquire m_lock, so we'll activate | |
1960 | * this when we release m_lock.*/ | |
33c7a0ef | 1961 | later.add(new LambdaContext([to_append](int r) { |
f67539c2 TL |
1962 | to_append->persist_gather_activate(); |
1963 | })); | |
1964 | ||
1965 | /* The flush request completes when the sync point persists */ | |
1966 | to_append->add_in_on_persisted_ctxs(flush_req); | |
1967 | } | |
1968 | ||
1969 | template <typename I> | |
1970 | void AbstractWriteLog<I>::flush_new_sync_point_if_needed(C_FlushRequestT *flush_req, | |
1971 | DeferredContexts &later) { | |
1972 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
1973 | ||
1974 | /* If there have been writes since the last sync point ... */ | |
1975 | if (m_current_sync_point->log_entry->writes) { | |
1976 | flush_new_sync_point(flush_req, later); | |
1977 | } else { | |
1978 | /* There have been no writes to the current sync point. */ | |
1979 | if (m_current_sync_point->earlier_sync_point) { | |
1980 | /* If previous sync point hasn't completed, complete this flush | |
1981 | * with the earlier sync point. No alloc or dispatch needed. */ | |
1982 | m_current_sync_point->earlier_sync_point->on_sync_point_persisted.push_back(flush_req); | |
1983 | } else { | |
1984 | /* The previous sync point has already completed and been | |
1985 | * appended. The current sync point has no writes, so this flush | |
1986 | * has nothing to wait for. This flush completes now. */ | |
1987 | later.add(flush_req); | |
1988 | } | |
1989 | } | |
1990 | } | |
1991 | ||
1992 | /* | |
1993 | * RWL internal flush - will actually flush the RWL. | |
1994 | * | |
1995 | * User flushes should arrive at aio_flush(), and only flush prior | |
1996 | * writes to all log replicas. | |
1997 | * | |
1998 | * Librbd internal flushes will arrive at flush(invalidate=false, | |
1999 | * discard=false), and traverse the block guard to ensure in-flight writes are | |
2000 | * flushed. | |
2001 | */ | |
2002 | template <typename I> | |
2003 | void AbstractWriteLog<I>::flush_dirty_entries(Context *on_finish) { | |
2004 | CephContext *cct = m_image_ctx.cct; | |
2005 | bool all_clean; | |
2006 | bool flushing; | |
2007 | bool stop_flushing; | |
2008 | ||
2009 | { | |
2010 | std::lock_guard locker(m_lock); | |
2011 | flushing = (0 != m_flush_ops_in_flight); | |
2012 | all_clean = m_dirty_log_entries.empty(); | |
33c7a0ef TL |
2013 | if (!m_cache_state->clean && all_clean && !flushing) { |
2014 | m_cache_state->clean = true; | |
2015 | update_image_cache_state(); | |
2016 | } | |
f67539c2 TL |
2017 | stop_flushing = (m_shutting_down); |
2018 | } | |
2019 | ||
2020 | if (!flushing && (all_clean || stop_flushing)) { | |
2021 | /* Complete without holding m_lock */ | |
2022 | if (all_clean) { | |
2023 | ldout(cct, 20) << "no dirty entries" << dendl; | |
2024 | } else { | |
2025 | ldout(cct, 5) << "flush during shutdown suppressed" << dendl; | |
2026 | } | |
2027 | on_finish->complete(0); | |
2028 | } else { | |
2029 | if (all_clean) { | |
2030 | ldout(cct, 5) << "flush ops still in progress" << dendl; | |
2031 | } else { | |
2032 | ldout(cct, 20) << "dirty entries remain" << dendl; | |
2033 | } | |
2034 | std::lock_guard locker(m_lock); | |
2035 | /* on_finish can't be completed yet */ | |
2036 | m_flush_complete_contexts.push_back(new LambdaContext( | |
2037 | [this, on_finish](int r) { | |
2038 | flush_dirty_entries(on_finish); | |
2039 | })); | |
2040 | wake_up(); | |
2041 | } | |
2042 | } | |
2043 | ||
2044 | template <typename I> | |
2045 | void AbstractWriteLog<I>::internal_flush(bool invalidate, Context *on_finish) { | |
2046 | ldout(m_image_ctx.cct, 20) << "invalidate=" << invalidate << dendl; | |
2047 | ||
2048 | if (m_perfcounter) { | |
2049 | if (invalidate) { | |
2050 | m_perfcounter->inc(l_librbd_pwl_invalidate_cache, 1); | |
2051 | } else { | |
a4b75251 | 2052 | m_perfcounter->inc(l_librbd_pwl_internal_flush, 1); |
f67539c2 TL |
2053 | } |
2054 | } | |
2055 | ||
2056 | /* May be called even if initialization fails */ | |
2057 | if (!m_initialized) { | |
2058 | ldout(m_image_ctx.cct, 05) << "never initialized" << dendl; | |
2059 | /* Deadlock if completed here */ | |
2060 | m_image_ctx.op_work_queue->queue(on_finish, 0); | |
2061 | return; | |
2062 | } | |
2063 | ||
2064 | /* Flush/invalidate must pass through block guard to ensure all layers of | |
2065 | * cache are consistently flush/invalidated. This ensures no in-flight write leaves | |
2066 | * some layers with valid regions, which may later produce inconsistent read | |
2067 | * results. */ | |
2068 | GuardedRequestFunctionContext *guarded_ctx = | |
2069 | new GuardedRequestFunctionContext( | |
2070 | [this, on_finish, invalidate](GuardedRequestFunctionContext &guard_ctx) { | |
2071 | DeferredContexts on_exit; | |
2072 | ldout(m_image_ctx.cct, 20) << "cell=" << guard_ctx.cell << dendl; | |
2073 | ceph_assert(guard_ctx.cell); | |
2074 | ||
2075 | Context *ctx = new LambdaContext( | |
2076 | [this, cell=guard_ctx.cell, invalidate, on_finish](int r) { | |
2077 | std::lock_guard locker(m_lock); | |
2078 | m_invalidating = false; | |
2079 | ldout(m_image_ctx.cct, 6) << "Done flush/invalidating (invalidate=" | |
2080 | << invalidate << ")" << dendl; | |
2081 | if (m_log_entries.size()) { | |
2082 | ldout(m_image_ctx.cct, 1) << "m_log_entries.size()=" | |
20effc67 TL |
2083 | << m_log_entries.size() |
2084 | << ", front()=" << *m_log_entries.front() | |
f67539c2 TL |
2085 | << dendl; |
2086 | } | |
2087 | if (invalidate) { | |
2088 | ceph_assert(m_log_entries.size() == 0); | |
2089 | } | |
2090 | ceph_assert(m_dirty_log_entries.size() == 0); | |
2091 | m_image_ctx.op_work_queue->queue(on_finish, r); | |
2092 | release_guarded_request(cell); | |
2093 | }); | |
2094 | ctx = new LambdaContext( | |
2095 | [this, ctx, invalidate](int r) { | |
2096 | Context *next_ctx = ctx; | |
2097 | ldout(m_image_ctx.cct, 6) << "flush_dirty_entries finished" << dendl; | |
2098 | if (r < 0) { | |
2099 | /* Override on_finish status with this error */ | |
2100 | next_ctx = new LambdaContext([r, ctx](int _r) { | |
2101 | ctx->complete(r); | |
2102 | }); | |
2103 | } | |
2104 | if (invalidate) { | |
2105 | { | |
2106 | std::lock_guard locker(m_lock); | |
2107 | ceph_assert(m_dirty_log_entries.size() == 0); | |
2108 | ceph_assert(!m_invalidating); | |
2109 | ldout(m_image_ctx.cct, 6) << "Invalidating" << dendl; | |
2110 | m_invalidating = true; | |
2111 | } | |
2112 | /* Discards all RWL entries */ | |
2113 | while (retire_entries(MAX_ALLOC_PER_TRANSACTION)) { } | |
2114 | next_ctx->complete(0); | |
2115 | } else { | |
2116 | { | |
2117 | std::lock_guard locker(m_lock); | |
2118 | ceph_assert(m_dirty_log_entries.size() == 0); | |
2119 | ceph_assert(!m_invalidating); | |
2120 | } | |
2121 | m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, next_ctx); | |
2122 | } | |
2123 | }); | |
2124 | ctx = new LambdaContext( | |
2125 | [this, ctx](int r) { | |
2126 | flush_dirty_entries(ctx); | |
2127 | }); | |
2128 | std::lock_guard locker(m_lock); | |
2129 | /* Even if we're throwing everything away, but we want the last entry to | |
2130 | * be a sync point so we can cleanly resume. | |
2131 | * | |
2132 | * Also, the blockguard only guarantees the replication of this op | |
2133 | * can't overlap with prior ops. It doesn't guarantee those are all | |
2134 | * completed and eligible for flush & retire, which we require here. | |
2135 | */ | |
2136 | auto flush_req = make_flush_req(ctx); | |
2137 | flush_new_sync_point_if_needed(flush_req, on_exit); | |
2138 | }); | |
2139 | detain_guarded_request(nullptr, guarded_ctx, true); | |
2140 | } | |
2141 | ||
2142 | template <typename I> | |
2143 | void AbstractWriteLog<I>::add_into_log_map(GenericWriteLogEntries &log_entries, | |
2144 | C_BlockIORequestT *req) { | |
2145 | req->copy_cache(); | |
2146 | m_blocks_to_log_entries.add_log_entries(log_entries); | |
2147 | } | |
2148 | ||
2149 | template <typename I> | |
2150 | bool AbstractWriteLog<I>::can_retire_entry(std::shared_ptr<GenericLogEntry> log_entry) { | |
2151 | CephContext *cct = m_image_ctx.cct; | |
2152 | ||
2153 | ldout(cct, 20) << dendl; | |
2154 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
20effc67 | 2155 | ceph_assert(log_entry); |
f67539c2 TL |
2156 | return log_entry->can_retire(); |
2157 | } | |
2158 | ||
a4b75251 TL |
2159 | template <typename I> |
2160 | void AbstractWriteLog<I>::check_image_cache_state_clean() { | |
2161 | ceph_assert(m_deferred_ios.empty()); | |
20effc67 | 2162 | ceph_assert(m_ops_to_append.empty()); |
a4b75251 TL |
2163 | ceph_assert(m_async_flush_ops == 0); |
2164 | ceph_assert(m_async_append_ops == 0); | |
2165 | ceph_assert(m_dirty_log_entries.empty()); | |
2166 | ceph_assert(m_ops_to_flush.empty()); | |
2167 | ceph_assert(m_flush_ops_in_flight == 0); | |
2168 | ceph_assert(m_flush_bytes_in_flight == 0); | |
2169 | ceph_assert(m_bytes_dirty == 0); | |
2170 | ceph_assert(m_work_queue.empty()); | |
2171 | } | |
2172 | ||
f67539c2 TL |
2173 | } // namespace pwl |
2174 | } // namespace cache | |
2175 | } // namespace librbd | |
2176 | ||
2177 | template class librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx>; |