1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include "LogOperation.h"
6 #include "librbd/cache/rwl/Types.h"
8 #define dout_subsys ceph_subsys_rbd_rwl
10 #define dout_prefix *_dout << "librbd::cache::rwl::LogOperation: " << this << " " \
19 GenericLogOperation::GenericLogOperation(const utime_t dispatch_time
, PerfCounters
*perfcounter
)
20 : m_perfcounter(perfcounter
), dispatch_time(dispatch_time
) {
23 std::ostream
& GenericLogOperation::format(std::ostream
&os
) const {
24 os
<< "dispatch_time=[" << dispatch_time
<< "], "
25 << "buf_persist_time=[" << buf_persist_time
<< "], "
26 << "buf_persist_comp_time=[" << buf_persist_comp_time
<< "], "
27 << "log_append_time=[" << log_append_time
<< "], "
28 << "log_append_comp_time=[" << log_append_comp_time
<< "], ";
32 std::ostream
&operator<<(std::ostream
&os
,
33 const GenericLogOperation
&op
) {
37 SyncPointLogOperation::SyncPointLogOperation(ceph::mutex
&lock
,
38 std::shared_ptr
<SyncPoint
> sync_point
,
39 const utime_t dispatch_time
,
40 PerfCounters
*perfcounter
,
42 : GenericLogOperation(dispatch_time
, perfcounter
), m_cct(cct
), m_lock(lock
), sync_point(sync_point
) {
45 SyncPointLogOperation::~SyncPointLogOperation() { }
47 std::ostream
&SyncPointLogOperation::format(std::ostream
&os
) const {
48 os
<< "(Sync Point) ";
49 GenericLogOperation::format(os
);
51 << "sync_point=[" << *sync_point
<< "]";
55 std::ostream
&operator<<(std::ostream
&os
,
56 const SyncPointLogOperation
&op
) {
60 std::vector
<Context
*> SyncPointLogOperation::append_sync_point() {
61 std::vector
<Context
*> appending_contexts
;
62 std::lock_guard
locker(m_lock
);
63 if (!sync_point
->appending
) {
64 sync_point
->appending
= true;
66 appending_contexts
.swap(sync_point
->on_sync_point_appending
);
67 return appending_contexts
;
70 void SyncPointLogOperation::clear_earlier_sync_point() {
71 std::lock_guard
locker(m_lock
);
72 ceph_assert(sync_point
->later_sync_point
);
73 ceph_assert(sync_point
->later_sync_point
->earlier_sync_point
==
75 sync_point
->later_sync_point
->earlier_sync_point
= nullptr;
78 std::vector
<Context
*> SyncPointLogOperation::swap_on_sync_point_persisted() {
79 std::lock_guard
locker(m_lock
);
80 std::vector
<Context
*> persisted_contexts
;
81 persisted_contexts
.swap(sync_point
->on_sync_point_persisted
);
82 return persisted_contexts
;
85 void SyncPointLogOperation::appending() {
86 ceph_assert(sync_point
);
87 ldout(m_cct
, 20) << "Sync point op=[" << *this
88 << "] appending" << dendl
;
89 auto appending_contexts
= append_sync_point();
90 for (auto &ctx
: appending_contexts
) {
95 void SyncPointLogOperation::complete(int result
) {
96 ceph_assert(sync_point
);
97 ldout(m_cct
, 20) << "Sync point op =[" << *this
98 << "] completed" << dendl
;
99 clear_earlier_sync_point();
101 /* Do append now in case completion occurred before the
102 * normal append callback executed, and to handle
103 * on_append work that was queued after the sync point
104 * entered the appending state. */
106 auto persisted_contexts
= swap_on_sync_point_persisted();
107 for (auto &ctx
: persisted_contexts
) {
108 ctx
->complete(result
);
112 GenericWriteLogOperation::GenericWriteLogOperation(std::shared_ptr
<SyncPoint
> sync_point
,
113 const utime_t dispatch_time
,
114 PerfCounters
*perfcounter
,
116 : GenericLogOperation(dispatch_time
, perfcounter
),
117 m_lock(ceph::make_mutex(util::unique_lock_name(
118 "librbd::cache::rwl::GenericWriteLogOperation::m_lock", this))),
120 sync_point(sync_point
) {
123 GenericWriteLogOperation::~GenericWriteLogOperation() { }
125 std::ostream
&GenericWriteLogOperation::format(std::ostream
&os
) const {
126 GenericLogOperation::format(os
);
130 std::ostream
&operator<<(std::ostream
&os
,
131 const GenericWriteLogOperation
&op
) {
132 return op
.format(os
);
135 /* Called when the write log operation is appending and its log position is guaranteed */
136 void GenericWriteLogOperation::appending() {
137 Context
*on_append
= nullptr;
138 ldout(m_cct
, 20) << __func__
<< " " << this << dendl
;
140 std::lock_guard
locker(m_lock
);
141 on_append
= on_write_append
;
142 on_write_append
= nullptr;
145 ldout(m_cct
, 20) << __func__
<< " " << this << " on_append=" << on_append
<< dendl
;
146 on_append
->complete(0);
150 /* Called when the write log operation is completed in all log replicas */
151 void GenericWriteLogOperation::complete(int result
) {
153 Context
*on_persist
= nullptr;
154 ldout(m_cct
, 20) << __func__
<< " " << this << dendl
;
156 std::lock_guard
locker(m_lock
);
157 on_persist
= on_write_persist
;
158 on_write_persist
= nullptr;
161 ldout(m_cct
, 20) << __func__
<< " " << this << " on_persist=" << on_persist
<< dendl
;
162 on_persist
->complete(result
);
166 WriteLogOperation::WriteLogOperation(WriteLogOperationSet
&set
,
167 uint64_t image_offset_bytes
, uint64_t write_bytes
,
169 : GenericWriteLogOperation(set
.sync_point
, set
.dispatch_time
, set
.perfcounter
, cct
),
170 log_entry(std::make_shared
<WriteLogEntry
>(set
.sync_point
->log_entry
, image_offset_bytes
, write_bytes
)) {
171 on_write_append
= set
.extent_ops_appending
->new_sub();
172 on_write_persist
= set
.extent_ops_persist
->new_sub();
173 log_entry
->sync_point_entry
->writes
++;
174 log_entry
->sync_point_entry
->bytes
+= write_bytes
;
177 WriteLogOperation::~WriteLogOperation() { }
179 void WriteLogOperation::init(bool has_data
, std::vector
<WriteBufferAllocation
>::iterator allocation
, uint64_t current_sync_gen
,
180 uint64_t last_op_sequence_num
, bufferlist
&write_req_bl
, uint64_t buffer_offset
,
181 bool persist_on_flush
) {
182 log_entry
->init(has_data
, allocation
, current_sync_gen
, last_op_sequence_num
, persist_on_flush
);
183 buffer_alloc
= &(*allocation
);
184 bl
.substr_of(write_req_bl
, buffer_offset
,
185 log_entry
->write_bytes());
188 std::ostream
&WriteLogOperation::format(std::ostream
&os
) const {
190 GenericWriteLogOperation::format(os
);
193 os
<< "log_entry=[" << *log_entry
<< "], ";
195 os
<< "log_entry=nullptr, ";
197 os
<< "bl=[" << bl
<< "],"
198 << "buffer_alloc=" << buffer_alloc
;
202 std::ostream
&operator<<(std::ostream
&os
,
203 const WriteLogOperation
&op
) {
204 return op
.format(os
);
208 void WriteLogOperation::complete(int result
) {
209 GenericWriteLogOperation::complete(result
);
210 m_perfcounter
->tinc(l_librbd_rwl_log_op_dis_to_buf_t
, buf_persist_time
- dispatch_time
);
211 utime_t buf_lat
= buf_persist_comp_time
- buf_persist_time
;
212 m_perfcounter
->tinc(l_librbd_rwl_log_op_buf_to_bufc_t
, buf_lat
);
213 m_perfcounter
->hinc(l_librbd_rwl_log_op_buf_to_bufc_t_hist
, buf_lat
.to_nsec(),
214 log_entry
->ram_entry
.write_bytes
);
215 m_perfcounter
->tinc(l_librbd_rwl_log_op_buf_to_app_t
, log_append_time
- buf_persist_time
);
218 void WriteLogOperation::copy_bl_to_pmem_buffer() {
219 /* operation is a shared_ptr, so write_op is only good as long as operation is in scope */
220 bufferlist::iterator
i(&bl
);
221 m_perfcounter
->inc(l_librbd_rwl_log_op_bytes
, log_entry
->write_bytes());
222 ldout(m_cct
, 20) << bl
<< dendl
;
223 i
.copy((unsigned)log_entry
->write_bytes(), (char*)log_entry
->pmem_buffer
);
226 void WriteLogOperation::flush_pmem_buf_to_cache(PMEMobjpool
*log_pool
) {
227 buf_persist_time
= ceph_clock_now();
228 pmemobj_flush(log_pool
, log_entry
->pmem_buffer
, log_entry
->write_bytes());
231 WriteLogOperationSet::WriteLogOperationSet(utime_t dispatched
, PerfCounters
*perfcounter
, std::shared_ptr
<SyncPoint
> sync_point
,
232 bool persist_on_flush
, CephContext
*cct
, Context
*on_finish
)
233 : m_cct(cct
), m_on_finish(on_finish
),
234 persist_on_flush(persist_on_flush
),
235 dispatch_time(dispatched
),
236 perfcounter(perfcounter
),
237 sync_point(sync_point
) {
238 on_ops_appending
= sync_point
->prior_log_entries_persisted
->new_sub();
239 on_ops_persist
= nullptr;
242 new LambdaContext( [this](int r
) {
243 ldout(this->m_cct
,20) << __func__
<< " " << this << " m_extent_ops_persist completed" << dendl
;
244 if (on_ops_persist
) {
245 on_ops_persist
->complete(r
);
247 m_on_finish
->complete(r
);
249 auto appending_persist_sub
= extent_ops_persist
->new_sub();
250 extent_ops_appending
=
252 new LambdaContext( [this, appending_persist_sub
](int r
) {
253 ldout(this->m_cct
, 20) << __func__
<< " " << this << " m_extent_ops_appending completed" << dendl
;
254 on_ops_appending
->complete(r
);
255 appending_persist_sub
->complete(r
);
259 WriteLogOperationSet::~WriteLogOperationSet() { }
261 std::ostream
&operator<<(std::ostream
&os
,
262 const WriteLogOperationSet
&s
) {
263 os
<< "cell=" << (void*)s
.cell
<< ", "
264 << "extent_ops_appending=[" << s
.extent_ops_appending
<< ", "
265 << "extent_ops_persist=[" << s
.extent_ops_persist
<< "]";
271 } // namespace librbd