1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include "LogOperation.h"
6 #include "librbd/cache/pwl/Types.h"
8 #define dout_subsys ceph_subsys_rbd_pwl
10 #define dout_prefix *_dout << "librbd::cache::pwl::LogOperation: " << this \
11 << " " << __func__ << ": "
17 GenericLogOperation::GenericLogOperation(utime_t dispatch_time
,
18 PerfCounters
*perfcounter
)
19 : m_perfcounter(perfcounter
), dispatch_time(dispatch_time
) {
22 std::ostream
& GenericLogOperation::format(std::ostream
&os
) const {
23 os
<< "dispatch_time=[" << dispatch_time
24 << "], buf_persist_start_time=[" << buf_persist_start_time
25 << "], buf_persist_comp_time=[" << buf_persist_comp_time
26 << "], log_append_start_time=[" << log_append_start_time
27 << "], log_append_comp_time=[" << log_append_comp_time
<< "]";
31 std::ostream
&operator<<(std::ostream
&os
,
32 const GenericLogOperation
&op
) {
36 SyncPointLogOperation::SyncPointLogOperation(ceph::mutex
&lock
,
37 std::shared_ptr
<SyncPoint
> sync_point
,
38 utime_t dispatch_time
,
39 PerfCounters
*perfcounter
,
41 : GenericLogOperation(dispatch_time
, perfcounter
), m_cct(cct
), m_lock(lock
),
42 sync_point(sync_point
) {
45 SyncPointLogOperation::~SyncPointLogOperation() { }
47 std::ostream
&SyncPointLogOperation::format(std::ostream
&os
) const {
48 os
<< "(Sync Point) ";
49 GenericLogOperation::format(os
);
50 os
<< ", sync_point=[" << *sync_point
<< "]";
54 std::ostream
&operator<<(std::ostream
&os
,
55 const SyncPointLogOperation
&op
) {
59 std::vector
<Context
*> SyncPointLogOperation::append_sync_point() {
60 std::vector
<Context
*> appending_contexts
;
61 std::lock_guard
locker(m_lock
);
62 if (!sync_point
->appending
) {
63 sync_point
->appending
= true;
65 appending_contexts
.swap(sync_point
->on_sync_point_appending
);
66 return appending_contexts
;
69 void SyncPointLogOperation::clear_earlier_sync_point() {
70 std::lock_guard
locker(m_lock
);
71 ceph_assert(sync_point
->later_sync_point
);
72 ceph_assert(sync_point
->later_sync_point
->earlier_sync_point
== sync_point
);
73 sync_point
->later_sync_point
->earlier_sync_point
= nullptr;
74 sync_point
->later_sync_point
= nullptr;
77 std::vector
<Context
*> SyncPointLogOperation::swap_on_sync_point_persisted() {
78 std::lock_guard
locker(m_lock
);
79 std::vector
<Context
*> persisted_contexts
;
80 persisted_contexts
.swap(sync_point
->on_sync_point_persisted
);
81 return persisted_contexts
;
84 void SyncPointLogOperation::appending() {
85 ceph_assert(sync_point
);
86 ldout(m_cct
, 20) << "Sync point op=[" << *this
87 << "] appending" << dendl
;
88 auto appending_contexts
= append_sync_point();
89 for (auto &ctx
: appending_contexts
) {
94 void SyncPointLogOperation::complete(int result
) {
95 ceph_assert(sync_point
);
96 ldout(m_cct
, 20) << "Sync point op =[" << *this
97 << "] completed" << dendl
;
98 clear_earlier_sync_point();
100 /* Do append now in case completion occurred before the
101 * normal append callback executed, and to handle
102 * on_append work that was queued after the sync point
103 * entered the appending state. */
105 auto persisted_contexts
= swap_on_sync_point_persisted();
106 for (auto &ctx
: persisted_contexts
) {
107 ctx
->complete(result
);
111 GenericWriteLogOperation::GenericWriteLogOperation(std::shared_ptr
<SyncPoint
> sync_point
,
112 utime_t dispatch_time
,
113 PerfCounters
*perfcounter
,
115 : GenericLogOperation(dispatch_time
, perfcounter
),
116 m_lock(ceph::make_mutex(pwl::unique_lock_name(
117 "librbd::cache::pwl::GenericWriteLogOperation::m_lock", this))),
119 sync_point(sync_point
) {
122 GenericWriteLogOperation::~GenericWriteLogOperation() { }
124 std::ostream
&GenericWriteLogOperation::format(std::ostream
&os
) const {
125 GenericLogOperation::format(os
);
129 std::ostream
&operator<<(std::ostream
&os
,
130 const GenericWriteLogOperation
&op
) {
131 return op
.format(os
);
134 /* Called when the write log operation is appending and its log position is guaranteed */
135 void GenericWriteLogOperation::appending() {
136 Context
*on_append
= nullptr;
137 ldout(m_cct
, 20) << __func__
<< " " << this << dendl
;
139 std::lock_guard
locker(m_lock
);
140 on_append
= on_write_append
;
141 on_write_append
= nullptr;
144 ldout(m_cct
, 20) << __func__
<< " " << this << " on_append=" << on_append
<< dendl
;
145 on_append
->complete(0);
149 /* Called when the write log operation is completed in all log replicas */
150 void GenericWriteLogOperation::complete(int result
) {
152 Context
*on_persist
= nullptr;
153 ldout(m_cct
, 20) << __func__
<< " " << this << dendl
;
155 std::lock_guard
locker(m_lock
);
156 on_persist
= on_write_persist
;
157 on_write_persist
= nullptr;
160 ldout(m_cct
, 20) << __func__
<< " " << this << " on_persist=" << on_persist
162 on_persist
->complete(result
);
166 WriteLogOperation::WriteLogOperation(
167 WriteLogOperationSet
&set
, uint64_t image_offset_bytes
,
168 uint64_t write_bytes
, CephContext
*cct
,
169 std::shared_ptr
<WriteLogEntry
> write_log_entry
)
170 : GenericWriteLogOperation(set
.sync_point
, set
.dispatch_time
,
171 set
.perfcounter
, cct
),
172 log_entry(write_log_entry
) {
173 on_write_append
= set
.extent_ops_appending
->new_sub();
174 on_write_persist
= set
.extent_ops_persist
->new_sub();
175 log_entry
->sync_point_entry
->writes
++;
176 log_entry
->sync_point_entry
->bytes
+= write_bytes
;
179 WriteLogOperation::WriteLogOperation(WriteLogOperationSet
&set
,
180 uint64_t image_offset_bytes
,
181 uint64_t write_bytes
,
184 std::shared_ptr
<WriteLogEntry
> writesame_log_entry
)
185 : WriteLogOperation(set
, image_offset_bytes
, write_bytes
, cct
,
186 writesame_log_entry
) {
190 WriteLogOperation::~WriteLogOperation() { }
192 void WriteLogOperation::init(bool has_data
, std::vector
<WriteBufferAllocation
>::iterator allocation
,
193 uint64_t current_sync_gen
,
194 uint64_t last_op_sequence_num
,
195 bufferlist
&write_req_bl
, uint64_t buffer_offset
,
196 bool persist_on_flush
) {
197 log_entry
->init(has_data
, current_sync_gen
, last_op_sequence_num
,
199 buffer_alloc
= &(*allocation
);
200 bl
.substr_of(write_req_bl
, buffer_offset
, log_entry
->write_bytes());
201 log_entry
->init_cache_bl(write_req_bl
, buffer_offset
,
202 log_entry
->write_bytes());
205 std::ostream
&WriteLogOperation::format(std::ostream
&os
) const {
206 std::string op_name
= is_writesame
? "(Write Same) " : "(Write) ";
208 GenericWriteLogOperation::format(os
);
210 os
<< ", log_entry=[" << *log_entry
<< "]";
212 os
<< ", log_entry=nullptr";
214 os
<< ", bl=[" << bl
<< "], buffer_alloc=" << buffer_alloc
;
218 std::ostream
&operator<<(std::ostream
&os
,
219 const WriteLogOperation
&op
) {
220 return op
.format(os
);
224 void WriteLogOperation::complete(int result
) {
225 GenericWriteLogOperation::complete(result
);
226 m_perfcounter
->tinc(l_librbd_pwl_log_op_dis_to_buf_t
,
227 buf_persist_start_time
- dispatch_time
);
228 utime_t buf_persist_lat
= buf_persist_comp_time
- buf_persist_start_time
;
229 m_perfcounter
->tinc(l_librbd_pwl_log_op_buf_to_bufc_t
, buf_persist_lat
);
230 m_perfcounter
->hinc(l_librbd_pwl_log_op_buf_to_bufc_t_hist
,
231 buf_persist_lat
.to_nsec(),
232 log_entry
->ram_entry
.write_bytes
);
233 m_perfcounter
->tinc(l_librbd_pwl_log_op_buf_to_app_t
,
234 log_append_start_time
- buf_persist_start_time
);
237 WriteLogOperationSet::WriteLogOperationSet(utime_t dispatched
, PerfCounters
*perfcounter
, std::shared_ptr
<SyncPoint
> sync_point
,
238 bool persist_on_flush
, CephContext
*cct
, Context
*on_finish
)
239 : m_cct(cct
), m_on_finish(on_finish
),
240 persist_on_flush(persist_on_flush
),
241 dispatch_time(dispatched
),
242 perfcounter(perfcounter
),
243 sync_point(sync_point
) {
244 on_ops_appending
= sync_point
->prior_persisted_gather_new_sub();
245 on_ops_persist
= nullptr;
248 new LambdaContext( [this](int r
) {
249 ldout(this->m_cct
,20) << __func__
<< " " << this << " m_extent_ops_persist completed" << dendl
;
250 if (on_ops_persist
) {
251 on_ops_persist
->complete(r
);
253 m_on_finish
->complete(r
);
255 auto appending_persist_sub
= extent_ops_persist
->new_sub();
256 extent_ops_appending
=
258 new LambdaContext( [this, appending_persist_sub
](int r
) {
259 ldout(this->m_cct
, 20) << __func__
<< " " << this << " m_extent_ops_appending completed" << dendl
;
260 on_ops_appending
->complete(r
);
261 appending_persist_sub
->complete(r
);
265 WriteLogOperationSet::~WriteLogOperationSet() { }
267 std::ostream
&operator<<(std::ostream
&os
,
268 const WriteLogOperationSet
&s
) {
269 os
<< "cell=" << (void*)s
.cell
270 << ", extent_ops_appending=" << s
.extent_ops_appending
271 << ", extent_ops_persist=" << s
.extent_ops_persist
;
275 DiscardLogOperation::DiscardLogOperation(std::shared_ptr
<SyncPoint
> sync_point
,
276 uint64_t image_offset_bytes
,
277 uint64_t write_bytes
,
278 uint32_t discard_granularity_bytes
,
279 utime_t dispatch_time
,
280 PerfCounters
*perfcounter
,
282 : GenericWriteLogOperation(sync_point
, dispatch_time
, perfcounter
, cct
),
283 log_entry(std::make_shared
<DiscardLogEntry
>(sync_point
->log_entry
,
286 discard_granularity_bytes
)) {
287 on_write_persist
= nullptr;
288 log_entry
->sync_point_entry
->writes
++;
289 log_entry
->sync_point_entry
->bytes
+= write_bytes
;
292 DiscardLogOperation::~DiscardLogOperation() { }
294 std::ostream
&DiscardLogOperation::format(std::ostream
&os
) const {
296 GenericWriteLogOperation::format(os
);
298 os
<< ", log_entry=[" << *log_entry
<< "]";
300 os
<< ", log_entry=nullptr";
305 std::ostream
&operator<<(std::ostream
&os
,
306 const DiscardLogOperation
&op
) {
307 return op
.format(os
);
312 } // namespace librbd