1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include "LogOperation.h"
6 #include "librbd/cache/pwl/Types.h"
8 #define dout_subsys ceph_subsys_rbd_pwl
10 #define dout_prefix *_dout << "librbd::cache::pwl::LogOperation: " << this \
11 << " " << __func__ << ": "
17 GenericLogOperation::GenericLogOperation(utime_t dispatch_time
,
18 PerfCounters
*perfcounter
)
19 : m_perfcounter(perfcounter
), dispatch_time(dispatch_time
) {
22 std::ostream
& GenericLogOperation::format(std::ostream
&os
) const {
23 os
<< "dispatch_time=[" << dispatch_time
<< "], "
24 << "buf_persist_time=[" << buf_persist_time
<< "], "
25 << "buf_persist_comp_time=[" << buf_persist_comp_time
<< "], "
26 << "log_append_time=[" << log_append_time
<< "], "
27 << "log_append_comp_time=[" << log_append_comp_time
<< "], ";
31 std::ostream
&operator<<(std::ostream
&os
,
32 const GenericLogOperation
&op
) {
36 SyncPointLogOperation::SyncPointLogOperation(ceph::mutex
&lock
,
37 std::shared_ptr
<SyncPoint
> sync_point
,
38 utime_t dispatch_time
,
39 PerfCounters
*perfcounter
,
41 : GenericLogOperation(dispatch_time
, perfcounter
), m_cct(cct
), m_lock(lock
),
42 sync_point(sync_point
) {
45 SyncPointLogOperation::~SyncPointLogOperation() { }
47 std::ostream
&SyncPointLogOperation::format(std::ostream
&os
) const {
48 os
<< "(Sync Point) ";
49 GenericLogOperation::format(os
);
51 << "sync_point=[" << *sync_point
<< "]";
55 std::ostream
&operator<<(std::ostream
&os
,
56 const SyncPointLogOperation
&op
) {
60 std::vector
<Context
*> SyncPointLogOperation::append_sync_point() {
61 std::vector
<Context
*> appending_contexts
;
62 std::lock_guard
locker(m_lock
);
63 if (!sync_point
->appending
) {
64 sync_point
->appending
= true;
66 appending_contexts
.swap(sync_point
->on_sync_point_appending
);
67 return appending_contexts
;
70 void SyncPointLogOperation::clear_earlier_sync_point() {
71 std::lock_guard
locker(m_lock
);
72 ceph_assert(sync_point
->later_sync_point
);
73 ceph_assert(sync_point
->later_sync_point
->earlier_sync_point
==
75 sync_point
->later_sync_point
->earlier_sync_point
= nullptr;
78 std::vector
<Context
*> SyncPointLogOperation::swap_on_sync_point_persisted() {
79 std::lock_guard
locker(m_lock
);
80 std::vector
<Context
*> persisted_contexts
;
81 persisted_contexts
.swap(sync_point
->on_sync_point_persisted
);
82 return persisted_contexts
;
85 void SyncPointLogOperation::appending() {
86 ceph_assert(sync_point
);
87 ldout(m_cct
, 20) << "Sync point op=[" << *this
88 << "] appending" << dendl
;
89 auto appending_contexts
= append_sync_point();
90 for (auto &ctx
: appending_contexts
) {
95 void SyncPointLogOperation::complete(int result
) {
96 ceph_assert(sync_point
);
97 ldout(m_cct
, 20) << "Sync point op =[" << *this
98 << "] completed" << dendl
;
99 clear_earlier_sync_point();
101 /* Do append now in case completion occurred before the
102 * normal append callback executed, and to handle
103 * on_append work that was queued after the sync point
104 * entered the appending state. */
106 auto persisted_contexts
= swap_on_sync_point_persisted();
107 for (auto &ctx
: persisted_contexts
) {
108 ctx
->complete(result
);
112 GenericWriteLogOperation::GenericWriteLogOperation(std::shared_ptr
<SyncPoint
> sync_point
,
113 utime_t dispatch_time
,
114 PerfCounters
*perfcounter
,
116 : GenericLogOperation(dispatch_time
, perfcounter
),
117 m_lock(ceph::make_mutex(pwl::unique_lock_name(
118 "librbd::cache::pwl::GenericWriteLogOperation::m_lock", this))),
120 sync_point(sync_point
) {
123 GenericWriteLogOperation::~GenericWriteLogOperation() { }
125 std::ostream
&GenericWriteLogOperation::format(std::ostream
&os
) const {
126 GenericLogOperation::format(os
);
130 std::ostream
&operator<<(std::ostream
&os
,
131 const GenericWriteLogOperation
&op
) {
132 return op
.format(os
);
135 /* Called when the write log operation is appending and its log position is guaranteed */
136 void GenericWriteLogOperation::appending() {
137 Context
*on_append
= nullptr;
138 ldout(m_cct
, 20) << __func__
<< " " << this << dendl
;
140 std::lock_guard
locker(m_lock
);
141 on_append
= on_write_append
;
142 on_write_append
= nullptr;
145 ldout(m_cct
, 20) << __func__
<< " " << this << " on_append=" << on_append
<< dendl
;
146 on_append
->complete(0);
150 /* Called when the write log operation is completed in all log replicas */
151 void GenericWriteLogOperation::complete(int result
) {
153 Context
*on_persist
= nullptr;
154 ldout(m_cct
, 20) << __func__
<< " " << this << dendl
;
156 std::lock_guard
locker(m_lock
);
157 on_persist
= on_write_persist
;
158 on_write_persist
= nullptr;
161 ldout(m_cct
, 20) << __func__
<< " " << this << " on_persist=" << on_persist
163 on_persist
->complete(result
);
167 WriteLogOperation::WriteLogOperation(
168 WriteLogOperationSet
&set
, uint64_t image_offset_bytes
,
169 uint64_t write_bytes
, CephContext
*cct
,
170 std::shared_ptr
<WriteLogEntry
> write_log_entry
)
171 : GenericWriteLogOperation(set
.sync_point
, set
.dispatch_time
,
172 set
.perfcounter
, cct
),
173 log_entry(write_log_entry
) {
174 on_write_append
= set
.extent_ops_appending
->new_sub();
175 on_write_persist
= set
.extent_ops_persist
->new_sub();
176 log_entry
->sync_point_entry
->writes
++;
177 log_entry
->sync_point_entry
->bytes
+= write_bytes
;
180 WriteLogOperation::WriteLogOperation(WriteLogOperationSet
&set
,
181 uint64_t image_offset_bytes
,
182 uint64_t write_bytes
,
185 std::shared_ptr
<WriteLogEntry
> writesame_log_entry
)
186 : WriteLogOperation(set
, image_offset_bytes
, write_bytes
, cct
,
187 writesame_log_entry
) {
191 WriteLogOperation::~WriteLogOperation() { }
193 void WriteLogOperation::init(bool has_data
, std::vector
<WriteBufferAllocation
>::iterator allocation
,
194 uint64_t current_sync_gen
,
195 uint64_t last_op_sequence_num
,
196 bufferlist
&write_req_bl
, uint64_t buffer_offset
,
197 bool persist_on_flush
) {
198 log_entry
->init(has_data
, current_sync_gen
, last_op_sequence_num
,
200 buffer_alloc
= &(*allocation
);
201 bl
.substr_of(write_req_bl
, buffer_offset
, log_entry
->write_bytes());
202 log_entry
->init_cache_bl(write_req_bl
, buffer_offset
,
203 log_entry
->write_bytes());
206 std::ostream
&WriteLogOperation::format(std::ostream
&os
) const {
207 string op_name
= is_writesame
? "(Write Same) " : "(Write) ";
209 GenericWriteLogOperation::format(os
);
212 os
<< "log_entry=[" << *log_entry
<< "], ";
214 os
<< "log_entry=nullptr, ";
216 os
<< "bl=[" << bl
<< "],"
217 << "buffer_alloc=" << buffer_alloc
;
221 std::ostream
&operator<<(std::ostream
&os
,
222 const WriteLogOperation
&op
) {
223 return op
.format(os
);
227 void WriteLogOperation::complete(int result
) {
228 GenericWriteLogOperation::complete(result
);
229 m_perfcounter
->tinc(l_librbd_pwl_log_op_dis_to_buf_t
, buf_persist_time
- dispatch_time
);
230 utime_t buf_lat
= buf_persist_comp_time
- buf_persist_time
;
231 m_perfcounter
->tinc(l_librbd_pwl_log_op_buf_to_bufc_t
, buf_lat
);
232 m_perfcounter
->hinc(l_librbd_pwl_log_op_buf_to_bufc_t_hist
, buf_lat
.to_nsec(),
233 log_entry
->ram_entry
.write_bytes
);
234 m_perfcounter
->tinc(l_librbd_pwl_log_op_buf_to_app_t
, log_append_time
- buf_persist_time
);
237 WriteLogOperationSet::WriteLogOperationSet(utime_t dispatched
, PerfCounters
*perfcounter
, std::shared_ptr
<SyncPoint
> sync_point
,
238 bool persist_on_flush
, CephContext
*cct
, Context
*on_finish
)
239 : m_cct(cct
), m_on_finish(on_finish
),
240 persist_on_flush(persist_on_flush
),
241 dispatch_time(dispatched
),
242 perfcounter(perfcounter
),
243 sync_point(sync_point
) {
244 on_ops_appending
= sync_point
->prior_persisted_gather_new_sub();
245 on_ops_persist
= nullptr;
248 new LambdaContext( [this](int r
) {
249 ldout(this->m_cct
,20) << __func__
<< " " << this << " m_extent_ops_persist completed" << dendl
;
250 if (on_ops_persist
) {
251 on_ops_persist
->complete(r
);
253 m_on_finish
->complete(r
);
255 auto appending_persist_sub
= extent_ops_persist
->new_sub();
256 extent_ops_appending
=
258 new LambdaContext( [this, appending_persist_sub
](int r
) {
259 ldout(this->m_cct
, 20) << __func__
<< " " << this << " m_extent_ops_appending completed" << dendl
;
260 on_ops_appending
->complete(r
);
261 appending_persist_sub
->complete(r
);
265 WriteLogOperationSet::~WriteLogOperationSet() { }
267 std::ostream
&operator<<(std::ostream
&os
,
268 const WriteLogOperationSet
&s
) {
269 os
<< "cell=" << (void*)s
.cell
<< ", "
270 << "extent_ops_appending=[" << s
.extent_ops_appending
<< ", "
271 << "extent_ops_persist=[" << s
.extent_ops_persist
<< "]";
275 DiscardLogOperation::DiscardLogOperation(std::shared_ptr
<SyncPoint
> sync_point
,
276 uint64_t image_offset_bytes
,
277 uint64_t write_bytes
,
278 uint32_t discard_granularity_bytes
,
279 utime_t dispatch_time
,
280 PerfCounters
*perfcounter
,
282 : GenericWriteLogOperation(sync_point
, dispatch_time
, perfcounter
, cct
),
283 log_entry(std::make_shared
<DiscardLogEntry
>(sync_point
->log_entry
,
286 discard_granularity_bytes
)) {
287 on_write_persist
= nullptr;
288 log_entry
->sync_point_entry
->writes
++;
289 log_entry
->sync_point_entry
->bytes
+= write_bytes
;
292 DiscardLogOperation::~DiscardLogOperation() { }
294 std::ostream
&DiscardLogOperation::format(std::ostream
&os
) const {
296 GenericWriteLogOperation::format(os
);
299 os
<< "log_entry=[" << *log_entry
<< "], ";
301 os
<< "log_entry=nullptr, ";
306 std::ostream
&operator<<(std::ostream
&os
,
307 const DiscardLogOperation
&op
) {
308 return op
.format(os
);
313 } // namespace librbd