]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/cache/rwl/LogOperation.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / librbd / cache / rwl / LogOperation.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <iostream>
5 #include "LogOperation.h"
6 #include "librbd/cache/rwl/Types.h"
7
8 #define dout_subsys ceph_subsys_rbd_rwl
9 #undef dout_prefix
10 #define dout_prefix *_dout << "librbd::cache::rwl::LogOperation: " << this << " " \
11 << __func__ << ": "
12
13 namespace librbd {
14
15 namespace cache {
16
17 namespace rwl {
18
19 GenericLogOperation::GenericLogOperation(const utime_t dispatch_time, PerfCounters *perfcounter)
20 : m_perfcounter(perfcounter), dispatch_time(dispatch_time) {
21 }
22
23 std::ostream& GenericLogOperation::format(std::ostream &os) const {
24 os << "dispatch_time=[" << dispatch_time << "], "
25 << "buf_persist_time=[" << buf_persist_time << "], "
26 << "buf_persist_comp_time=[" << buf_persist_comp_time << "], "
27 << "log_append_time=[" << log_append_time << "], "
28 << "log_append_comp_time=[" << log_append_comp_time << "], ";
29 return os;
30 };
31
32 std::ostream &operator<<(std::ostream &os,
33 const GenericLogOperation &op) {
34 return op.format(os);
35 }
36
37 SyncPointLogOperation::SyncPointLogOperation(ceph::mutex &lock,
38 std::shared_ptr<SyncPoint> sync_point,
39 const utime_t dispatch_time,
40 PerfCounters *perfcounter,
41 CephContext *cct)
42 : GenericLogOperation(dispatch_time, perfcounter), m_cct(cct), m_lock(lock), sync_point(sync_point) {
43 }
44
45 SyncPointLogOperation::~SyncPointLogOperation() { }
46
47 std::ostream &SyncPointLogOperation::format(std::ostream &os) const {
48 os << "(Sync Point) ";
49 GenericLogOperation::format(os);
50 os << ", "
51 << "sync_point=[" << *sync_point << "]";
52 return os;
53 };
54
55 std::ostream &operator<<(std::ostream &os,
56 const SyncPointLogOperation &op) {
57 return op.format(os);
58 }
59
60 std::vector<Context*> SyncPointLogOperation::append_sync_point() {
61 std::vector<Context*> appending_contexts;
62 std::lock_guard locker(m_lock);
63 if (!sync_point->appending) {
64 sync_point->appending = true;
65 }
66 appending_contexts.swap(sync_point->on_sync_point_appending);
67 return appending_contexts;
68 }
69
70 void SyncPointLogOperation::clear_earlier_sync_point() {
71 std::lock_guard locker(m_lock);
72 ceph_assert(sync_point->later_sync_point);
73 ceph_assert(sync_point->later_sync_point->earlier_sync_point ==
74 sync_point);
75 sync_point->later_sync_point->earlier_sync_point = nullptr;
76 }
77
78 std::vector<Context*> SyncPointLogOperation::swap_on_sync_point_persisted() {
79 std::lock_guard locker(m_lock);
80 std::vector<Context*> persisted_contexts;
81 persisted_contexts.swap(sync_point->on_sync_point_persisted);
82 return persisted_contexts;
83 }
84
85 void SyncPointLogOperation::appending() {
86 ceph_assert(sync_point);
87 ldout(m_cct, 20) << "Sync point op=[" << *this
88 << "] appending" << dendl;
89 auto appending_contexts = append_sync_point();
90 for (auto &ctx : appending_contexts) {
91 ctx->complete(0);
92 }
93 }
94
95 void SyncPointLogOperation::complete(int result) {
96 ceph_assert(sync_point);
97 ldout(m_cct, 20) << "Sync point op =[" << *this
98 << "] completed" << dendl;
99 clear_earlier_sync_point();
100
101 /* Do append now in case completion occurred before the
102 * normal append callback executed, and to handle
103 * on_append work that was queued after the sync point
104 * entered the appending state. */
105 appending();
106 auto persisted_contexts = swap_on_sync_point_persisted();
107 for (auto &ctx : persisted_contexts) {
108 ctx->complete(result);
109 }
110 }
111
112 GenericWriteLogOperation::GenericWriteLogOperation(std::shared_ptr<SyncPoint> sync_point,
113 const utime_t dispatch_time,
114 PerfCounters *perfcounter,
115 CephContext *cct)
116 : GenericLogOperation(dispatch_time, perfcounter),
117 m_lock(ceph::make_mutex(util::unique_lock_name(
118 "librbd::cache::rwl::GenericWriteLogOperation::m_lock", this))),
119 m_cct(cct),
120 sync_point(sync_point) {
121 }
122
123 GenericWriteLogOperation::~GenericWriteLogOperation() { }
124
125 std::ostream &GenericWriteLogOperation::format(std::ostream &os) const {
126 GenericLogOperation::format(os);
127 return os;
128 };
129
130 std::ostream &operator<<(std::ostream &os,
131 const GenericWriteLogOperation &op) {
132 return op.format(os);
133 }
134
135 /* Called when the write log operation is appending and its log position is guaranteed */
136 void GenericWriteLogOperation::appending() {
137 Context *on_append = nullptr;
138 ldout(m_cct, 20) << __func__ << " " << this << dendl;
139 {
140 std::lock_guard locker(m_lock);
141 on_append = on_write_append;
142 on_write_append = nullptr;
143 }
144 if (on_append) {
145 ldout(m_cct, 20) << __func__ << " " << this << " on_append=" << on_append << dendl;
146 on_append->complete(0);
147 }
148 }
149
150 /* Called when the write log operation is completed in all log replicas */
151 void GenericWriteLogOperation::complete(int result) {
152 appending();
153 Context *on_persist = nullptr;
154 ldout(m_cct, 20) << __func__ << " " << this << dendl;
155 {
156 std::lock_guard locker(m_lock);
157 on_persist = on_write_persist;
158 on_write_persist = nullptr;
159 }
160 if (on_persist) {
161 ldout(m_cct, 20) << __func__ << " " << this << " on_persist=" << on_persist << dendl;
162 on_persist->complete(result);
163 }
164 }
165
166 WriteLogOperation::WriteLogOperation(WriteLogOperationSet &set,
167 uint64_t image_offset_bytes, uint64_t write_bytes,
168 CephContext *cct)
169 : GenericWriteLogOperation(set.sync_point, set.dispatch_time, set.perfcounter, cct),
170 log_entry(std::make_shared<WriteLogEntry>(set.sync_point->log_entry, image_offset_bytes, write_bytes)) {
171 on_write_append = set.extent_ops_appending->new_sub();
172 on_write_persist = set.extent_ops_persist->new_sub();
173 log_entry->sync_point_entry->writes++;
174 log_entry->sync_point_entry->bytes += write_bytes;
175 }
176
177 WriteLogOperation::~WriteLogOperation() { }
178
179 void WriteLogOperation::init(bool has_data, std::vector<WriteBufferAllocation>::iterator allocation, uint64_t current_sync_gen,
180 uint64_t last_op_sequence_num, bufferlist &write_req_bl, uint64_t buffer_offset,
181 bool persist_on_flush) {
182 log_entry->init(has_data, allocation, current_sync_gen, last_op_sequence_num, persist_on_flush);
183 buffer_alloc = &(*allocation);
184 bl.substr_of(write_req_bl, buffer_offset,
185 log_entry->write_bytes());
186 }
187
188 std::ostream &WriteLogOperation::format(std::ostream &os) const {
189 os << "(Write) ";
190 GenericWriteLogOperation::format(os);
191 os << ", ";
192 if (log_entry) {
193 os << "log_entry=[" << *log_entry << "], ";
194 } else {
195 os << "log_entry=nullptr, ";
196 }
197 os << "bl=[" << bl << "],"
198 << "buffer_alloc=" << buffer_alloc;
199 return os;
200 };
201
202 std::ostream &operator<<(std::ostream &os,
203 const WriteLogOperation &op) {
204 return op.format(os);
205 }
206
207
208 void WriteLogOperation::complete(int result) {
209 GenericWriteLogOperation::complete(result);
210 m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_buf_t, buf_persist_time - dispatch_time);
211 utime_t buf_lat = buf_persist_comp_time - buf_persist_time;
212 m_perfcounter->tinc(l_librbd_rwl_log_op_buf_to_bufc_t, buf_lat);
213 m_perfcounter->hinc(l_librbd_rwl_log_op_buf_to_bufc_t_hist, buf_lat.to_nsec(),
214 log_entry->ram_entry.write_bytes);
215 m_perfcounter->tinc(l_librbd_rwl_log_op_buf_to_app_t, log_append_time - buf_persist_time);
216 }
217
218 void WriteLogOperation::copy_bl_to_pmem_buffer() {
219 /* operation is a shared_ptr, so write_op is only good as long as operation is in scope */
220 bufferlist::iterator i(&bl);
221 m_perfcounter->inc(l_librbd_rwl_log_op_bytes, log_entry->write_bytes());
222 ldout(m_cct, 20) << bl << dendl;
223 i.copy((unsigned)log_entry->write_bytes(), (char*)log_entry->pmem_buffer);
224 }
225
226 void WriteLogOperation::flush_pmem_buf_to_cache(PMEMobjpool *log_pool) {
227 buf_persist_time = ceph_clock_now();
228 pmemobj_flush(log_pool, log_entry->pmem_buffer, log_entry->write_bytes());
229 }
230
231 WriteLogOperationSet::WriteLogOperationSet(utime_t dispatched, PerfCounters *perfcounter, std::shared_ptr<SyncPoint> sync_point,
232 bool persist_on_flush, CephContext *cct, Context *on_finish)
233 : m_cct(cct), m_on_finish(on_finish),
234 persist_on_flush(persist_on_flush),
235 dispatch_time(dispatched),
236 perfcounter(perfcounter),
237 sync_point(sync_point) {
238 on_ops_appending = sync_point->prior_log_entries_persisted->new_sub();
239 on_ops_persist = nullptr;
240 extent_ops_persist =
241 new C_Gather(m_cct,
242 new LambdaContext( [this](int r) {
243 ldout(this->m_cct,20) << __func__ << " " << this << " m_extent_ops_persist completed" << dendl;
244 if (on_ops_persist) {
245 on_ops_persist->complete(r);
246 }
247 m_on_finish->complete(r);
248 }));
249 auto appending_persist_sub = extent_ops_persist->new_sub();
250 extent_ops_appending =
251 new C_Gather(m_cct,
252 new LambdaContext( [this, appending_persist_sub](int r) {
253 ldout(this->m_cct, 20) << __func__ << " " << this << " m_extent_ops_appending completed" << dendl;
254 on_ops_appending->complete(r);
255 appending_persist_sub->complete(r);
256 }));
257 }
258
259 WriteLogOperationSet::~WriteLogOperationSet() { }
260
261 std::ostream &operator<<(std::ostream &os,
262 const WriteLogOperationSet &s) {
263 os << "cell=" << (void*)s.cell << ", "
264 << "extent_ops_appending=[" << s.extent_ops_appending << ", "
265 << "extent_ops_persist=[" << s.extent_ops_persist << "]";
266 return os;
267 };
268
269 } // namespace rwl
270 } // namespace cache
271 } // namespace librbd