]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/cache/pwl/LogOperation.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / librbd / cache / pwl / LogOperation.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <iostream>
5 #include "LogOperation.h"
6 #include "librbd/cache/pwl/Types.h"
7
8 #define dout_subsys ceph_subsys_rbd_pwl
9 #undef dout_prefix
10 #define dout_prefix *_dout << "librbd::cache::pwl::LogOperation: " << this \
11 << " " << __func__ << ": "
12
13 namespace librbd {
14 namespace cache {
15 namespace pwl {
16
17 GenericLogOperation::GenericLogOperation(utime_t dispatch_time,
18 PerfCounters *perfcounter)
19 : m_perfcounter(perfcounter), dispatch_time(dispatch_time) {
20 }
21
22 std::ostream& GenericLogOperation::format(std::ostream &os) const {
23 os << "dispatch_time=[" << dispatch_time
24 << "], buf_persist_start_time=[" << buf_persist_start_time
25 << "], buf_persist_comp_time=[" << buf_persist_comp_time
26 << "], log_append_start_time=[" << log_append_start_time
27 << "], log_append_comp_time=[" << log_append_comp_time << "]";
28 return os;
29 }
30
31 std::ostream &operator<<(std::ostream &os,
32 const GenericLogOperation &op) {
33 return op.format(os);
34 }
35
36 SyncPointLogOperation::SyncPointLogOperation(ceph::mutex &lock,
37 std::shared_ptr<SyncPoint> sync_point,
38 utime_t dispatch_time,
39 PerfCounters *perfcounter,
40 CephContext *cct)
41 : GenericLogOperation(dispatch_time, perfcounter), m_cct(cct), m_lock(lock),
42 sync_point(sync_point) {
43 }
44
45 SyncPointLogOperation::~SyncPointLogOperation() { }
46
47 std::ostream &SyncPointLogOperation::format(std::ostream &os) const {
48 os << "(Sync Point) ";
49 GenericLogOperation::format(os);
50 os << ", sync_point=[" << *sync_point << "]";
51 return os;
52 }
53
54 std::ostream &operator<<(std::ostream &os,
55 const SyncPointLogOperation &op) {
56 return op.format(os);
57 }
58
59 std::vector<Context*> SyncPointLogOperation::append_sync_point() {
60 std::vector<Context*> appending_contexts;
61 std::lock_guard locker(m_lock);
62 if (!sync_point->appending) {
63 sync_point->appending = true;
64 }
65 appending_contexts.swap(sync_point->on_sync_point_appending);
66 return appending_contexts;
67 }
68
69 void SyncPointLogOperation::clear_earlier_sync_point() {
70 std::lock_guard locker(m_lock);
71 ceph_assert(sync_point->later_sync_point);
72 ceph_assert(sync_point->later_sync_point->earlier_sync_point == sync_point);
73 sync_point->later_sync_point->earlier_sync_point = nullptr;
74 sync_point->later_sync_point = nullptr;
75 }
76
77 std::vector<Context*> SyncPointLogOperation::swap_on_sync_point_persisted() {
78 std::lock_guard locker(m_lock);
79 std::vector<Context*> persisted_contexts;
80 persisted_contexts.swap(sync_point->on_sync_point_persisted);
81 return persisted_contexts;
82 }
83
84 void SyncPointLogOperation::appending() {
85 ceph_assert(sync_point);
86 ldout(m_cct, 20) << "Sync point op=[" << *this
87 << "] appending" << dendl;
88 auto appending_contexts = append_sync_point();
89 for (auto &ctx : appending_contexts) {
90 ctx->complete(0);
91 }
92 }
93
94 void SyncPointLogOperation::complete(int result) {
95 ceph_assert(sync_point);
96 ldout(m_cct, 20) << "Sync point op =[" << *this
97 << "] completed" << dendl;
98 clear_earlier_sync_point();
99
100 /* Do append now in case completion occurred before the
101 * normal append callback executed, and to handle
102 * on_append work that was queued after the sync point
103 * entered the appending state. */
104 appending();
105 auto persisted_contexts = swap_on_sync_point_persisted();
106 for (auto &ctx : persisted_contexts) {
107 ctx->complete(result);
108 }
109 }
110
111 GenericWriteLogOperation::GenericWriteLogOperation(std::shared_ptr<SyncPoint> sync_point,
112 utime_t dispatch_time,
113 PerfCounters *perfcounter,
114 CephContext *cct)
115 : GenericLogOperation(dispatch_time, perfcounter),
116 m_lock(ceph::make_mutex(pwl::unique_lock_name(
117 "librbd::cache::pwl::GenericWriteLogOperation::m_lock", this))),
118 m_cct(cct),
119 sync_point(sync_point) {
120 }
121
122 GenericWriteLogOperation::~GenericWriteLogOperation() { }
123
124 std::ostream &GenericWriteLogOperation::format(std::ostream &os) const {
125 GenericLogOperation::format(os);
126 return os;
127 }
128
129 std::ostream &operator<<(std::ostream &os,
130 const GenericWriteLogOperation &op) {
131 return op.format(os);
132 }
133
134 /* Called when the write log operation is appending and its log position is guaranteed */
135 void GenericWriteLogOperation::appending() {
136 Context *on_append = nullptr;
137 ldout(m_cct, 20) << __func__ << " " << this << dendl;
138 {
139 std::lock_guard locker(m_lock);
140 on_append = on_write_append;
141 on_write_append = nullptr;
142 }
143 if (on_append) {
144 ldout(m_cct, 20) << __func__ << " " << this << " on_append=" << on_append << dendl;
145 on_append->complete(0);
146 }
147 }
148
149 /* Called when the write log operation is completed in all log replicas */
150 void GenericWriteLogOperation::complete(int result) {
151 appending();
152 Context *on_persist = nullptr;
153 ldout(m_cct, 20) << __func__ << " " << this << dendl;
154 {
155 std::lock_guard locker(m_lock);
156 on_persist = on_write_persist;
157 on_write_persist = nullptr;
158 }
159 if (on_persist) {
160 ldout(m_cct, 20) << __func__ << " " << this << " on_persist=" << on_persist
161 << dendl;
162 on_persist->complete(result);
163 }
164 }
165
166 WriteLogOperation::WriteLogOperation(
167 WriteLogOperationSet &set, uint64_t image_offset_bytes,
168 uint64_t write_bytes, CephContext *cct,
169 std::shared_ptr<WriteLogEntry> write_log_entry)
170 : GenericWriteLogOperation(set.sync_point, set.dispatch_time,
171 set.perfcounter, cct),
172 log_entry(write_log_entry) {
173 on_write_append = set.extent_ops_appending->new_sub();
174 on_write_persist = set.extent_ops_persist->new_sub();
175 log_entry->sync_point_entry->writes++;
176 log_entry->sync_point_entry->bytes += write_bytes;
177 }
178
179 WriteLogOperation::WriteLogOperation(WriteLogOperationSet &set,
180 uint64_t image_offset_bytes,
181 uint64_t write_bytes,
182 uint32_t data_len,
183 CephContext *cct,
184 std::shared_ptr<WriteLogEntry> writesame_log_entry)
185 : WriteLogOperation(set, image_offset_bytes, write_bytes, cct,
186 writesame_log_entry) {
187 is_writesame = true;
188 }
189
190 WriteLogOperation::~WriteLogOperation() { }
191
192 void WriteLogOperation::init(bool has_data, std::vector<WriteBufferAllocation>::iterator allocation,
193 uint64_t current_sync_gen,
194 uint64_t last_op_sequence_num,
195 bufferlist &write_req_bl, uint64_t buffer_offset,
196 bool persist_on_flush) {
197 log_entry->init(has_data, current_sync_gen, last_op_sequence_num,
198 persist_on_flush);
199 buffer_alloc = &(*allocation);
200 bl.substr_of(write_req_bl, buffer_offset, log_entry->write_bytes());
201 log_entry->init_cache_bl(write_req_bl, buffer_offset,
202 log_entry->write_bytes());
203 }
204
205 std::ostream &WriteLogOperation::format(std::ostream &os) const {
206 std::string op_name = is_writesame ? "(Write Same) " : "(Write) ";
207 os << op_name;
208 GenericWriteLogOperation::format(os);
209 if (log_entry) {
210 os << ", log_entry=[" << *log_entry << "]";
211 } else {
212 os << ", log_entry=nullptr";
213 }
214 os << ", bl=[" << bl << "], buffer_alloc=" << buffer_alloc;
215 return os;
216 }
217
218 std::ostream &operator<<(std::ostream &os,
219 const WriteLogOperation &op) {
220 return op.format(os);
221 }
222
223
224 void WriteLogOperation::complete(int result) {
225 GenericWriteLogOperation::complete(result);
226 m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_buf_t,
227 buf_persist_start_time - dispatch_time);
228 utime_t buf_persist_lat = buf_persist_comp_time - buf_persist_start_time;
229 m_perfcounter->tinc(l_librbd_pwl_log_op_buf_to_bufc_t, buf_persist_lat);
230 m_perfcounter->hinc(l_librbd_pwl_log_op_buf_to_bufc_t_hist,
231 buf_persist_lat.to_nsec(),
232 log_entry->ram_entry.write_bytes);
233 m_perfcounter->tinc(l_librbd_pwl_log_op_buf_to_app_t,
234 log_append_start_time - buf_persist_start_time);
235 }
236
237 WriteLogOperationSet::WriteLogOperationSet(utime_t dispatched, PerfCounters *perfcounter, std::shared_ptr<SyncPoint> sync_point,
238 bool persist_on_flush, CephContext *cct, Context *on_finish)
239 : m_cct(cct), m_on_finish(on_finish),
240 persist_on_flush(persist_on_flush),
241 dispatch_time(dispatched),
242 perfcounter(perfcounter),
243 sync_point(sync_point) {
244 on_ops_appending = sync_point->prior_persisted_gather_new_sub();
245 on_ops_persist = nullptr;
246 extent_ops_persist =
247 new C_Gather(m_cct,
248 new LambdaContext( [this](int r) {
249 ldout(this->m_cct,20) << __func__ << " " << this << " m_extent_ops_persist completed" << dendl;
250 if (on_ops_persist) {
251 on_ops_persist->complete(r);
252 }
253 m_on_finish->complete(r);
254 }));
255 auto appending_persist_sub = extent_ops_persist->new_sub();
256 extent_ops_appending =
257 new C_Gather(m_cct,
258 new LambdaContext( [this, appending_persist_sub](int r) {
259 ldout(this->m_cct, 20) << __func__ << " " << this << " m_extent_ops_appending completed" << dendl;
260 on_ops_appending->complete(r);
261 appending_persist_sub->complete(r);
262 }));
263 }
264
265 WriteLogOperationSet::~WriteLogOperationSet() { }
266
267 std::ostream &operator<<(std::ostream &os,
268 const WriteLogOperationSet &s) {
269 os << "cell=" << (void*)s.cell
270 << ", extent_ops_appending=" << s.extent_ops_appending
271 << ", extent_ops_persist=" << s.extent_ops_persist;
272 return os;
273 }
274
275 DiscardLogOperation::DiscardLogOperation(std::shared_ptr<SyncPoint> sync_point,
276 uint64_t image_offset_bytes,
277 uint64_t write_bytes,
278 uint32_t discard_granularity_bytes,
279 utime_t dispatch_time,
280 PerfCounters *perfcounter,
281 CephContext *cct)
282 : GenericWriteLogOperation(sync_point, dispatch_time, perfcounter, cct),
283 log_entry(std::make_shared<DiscardLogEntry>(sync_point->log_entry,
284 image_offset_bytes,
285 write_bytes,
286 discard_granularity_bytes)) {
287 on_write_persist = nullptr;
288 log_entry->sync_point_entry->writes++;
289 log_entry->sync_point_entry->bytes += write_bytes;
290 }
291
292 DiscardLogOperation::~DiscardLogOperation() { }
293
294 std::ostream &DiscardLogOperation::format(std::ostream &os) const {
295 os << "(Discard) ";
296 GenericWriteLogOperation::format(os);
297 if (log_entry) {
298 os << ", log_entry=[" << *log_entry << "]";
299 } else {
300 os << ", log_entry=nullptr";
301 }
302 return os;
303 }
304
305 std::ostream &operator<<(std::ostream &os,
306 const DiscardLogOperation &op) {
307 return op.format(os);
308 }
309
310 } // namespace pwl
311 } // namespace cache
312 } // namespace librbd