]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_ggate/Server.cc
update sources to v12.1.3
[ceph.git] / ceph / src / tools / rbd_ggate / Server.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "common/debug.h"
5 #include "common/errno.h"
6 #include "Driver.h"
7 #include "Server.h"
8 #include "Request.h"
9
10 #define dout_context g_ceph_context
11 #define dout_subsys ceph_subsys_rbd
12 #undef dout_prefix
13 #define dout_prefix *_dout << "rbd::ggate::Server: " << this \
14 << " " << __func__ << ": "
15
16 namespace rbd {
17 namespace ggate {
18
19 Server::Server(Driver *drv, librbd::Image& image)
20 : m_drv(drv), m_image(image), m_lock("rbd::ggate::Server::m_lock"),
21 m_reader_thread(this, &Server::reader_entry),
22 m_writer_thread(this, &Server::writer_entry) {
23 }
24
25 void Server::run() {
26 dout(10) << dendl;
27
28 int r = start();
29 assert(r == 0);
30
31 dout(20) << "entering run loop" << dendl;
32
33 {
34 Mutex::Locker locker(m_lock);
35 while (!m_stopping) {
36 m_cond.WaitInterval(m_lock, utime_t(1, 0));
37 }
38 }
39
40 dout(20) << "exiting run loop" << dendl;
41
42 stop();
43 }
44
45 int Server::start() {
46 dout(10) << dendl;
47
48 m_reader_thread.create("rbd_reader");
49 m_writer_thread.create("rbd_writer");
50 return 0;
51 }
52
53 void Server::stop() {
54 dout(10) << dendl;
55
56 {
57 Mutex::Locker locker(m_lock);
58 assert(m_stopping);
59 }
60
61 m_reader_thread.join();
62 m_writer_thread.join();
63
64 wait_clean();
65 }
66
67 void Server::io_start(IOContext *ctx) {
68 dout(20) << ctx << dendl;
69
70 Mutex::Locker locker(m_lock);
71 m_io_pending.push_back(&ctx->item);
72 }
73
74 void Server::io_finish(IOContext *ctx) {
75 dout(20) << ctx << dendl;
76
77 Mutex::Locker locker(m_lock);
78 assert(ctx->item.is_on_list());
79
80 ctx->item.remove_myself();
81 m_io_finished.push_back(&ctx->item);
82 m_cond.Signal();
83 }
84
85 Server::IOContext *Server::wait_io_finish() {
86 dout(20) << dendl;
87
88 Mutex::Locker locker(m_lock);
89
90 while (m_io_finished.empty() && !m_stopping) {
91 m_cond.Wait(m_lock);
92 }
93
94 if (m_io_finished.empty()) {
95 return nullptr;
96 }
97
98 IOContext *ret = m_io_finished.front();
99 m_io_finished.pop_front();
100
101 return ret;
102 }
103
104 void Server::wait_clean() {
105 dout(20) << dendl;
106
107 assert(!m_reader_thread.is_started());
108
109 Mutex::Locker locker(m_lock);
110
111 while (!m_io_pending.empty()) {
112 m_cond.Wait(m_lock);
113 }
114
115 while (!m_io_finished.empty()) {
116 ceph::unique_ptr<IOContext> free_ctx(m_io_finished.front());
117 m_io_finished.pop_front();
118 }
119 }
120
121 void Server::aio_callback(librbd::completion_t cb, void *arg) {
122 librbd::RBD::AioCompletion *aio_completion =
123 reinterpret_cast<librbd::RBD::AioCompletion*>(cb);
124
125 IOContext *ctx = reinterpret_cast<IOContext *>(arg);
126 int r = aio_completion->get_return_value();
127
128 ctx->server->handle_aio(ctx, r);
129 aio_completion->release();
130 }
131
132 void Server::handle_aio(IOContext *ctx, int r) {
133 dout(20) << ctx << ": r=" << r << dendl;
134
135 if (r == -EINVAL) {
136 // if shrinking an image, a pagecache writeback might reference
137 // extents outside of the range of the new image extents
138 dout(5) << "masking IO out-of-bounds error" << dendl;
139 ctx->req->bl.clear();
140 r = 0;
141 }
142
143 if (r < 0) {
144 ctx->req->set_error(-r);
145 } else if ((ctx->req->get_cmd() == Request::Read) &&
146 r != static_cast<int>(ctx->req->get_length())) {
147 int pad_byte_count = static_cast<int> (ctx->req->get_length()) - r;
148 ctx->req->bl.append_zero(pad_byte_count);
149 dout(20) << ctx << ": pad byte count: " << pad_byte_count << dendl;
150 ctx->req->set_error(0);
151 } else {
152 ctx->req->set_error(0);
153 }
154 io_finish(ctx);
155 }
156
157 void Server::reader_entry() {
158 dout(20) << dendl;
159
160 while (!m_stopping) {
161 ceph::unique_ptr<IOContext> ctx(new IOContext(this));
162
163 dout(20) << "waiting for ggate request" << dendl;
164
165 int r = m_drv->recv(&ctx->req);
166 if (r < 0) {
167 if (r != -ECANCELED) {
168 derr << "recv: " << cpp_strerror(r) << dendl;
169 }
170 Mutex::Locker locker(m_lock);
171 m_stopping = true;
172 m_cond.Signal();
173 return;
174 }
175
176 IOContext *pctx = ctx.release();
177
178 dout(20) << pctx << ": start: " << *pctx << dendl;
179
180 io_start(pctx);
181 librbd::RBD::AioCompletion *c =
182 new librbd::RBD::AioCompletion(pctx, aio_callback);
183 switch (pctx->req->get_cmd())
184 {
185 case rbd::ggate::Request::Write:
186 m_image.aio_write(pctx->req->get_offset(), pctx->req->get_length(),
187 pctx->req->bl, c);
188 break;
189 case rbd::ggate::Request::Read:
190 m_image.aio_read(pctx->req->get_offset(), pctx->req->get_length(),
191 pctx->req->bl, c);
192 break;
193 case rbd::ggate::Request::Flush:
194 m_image.aio_flush(c);
195 break;
196 case rbd::ggate::Request::Discard:
197 m_image.aio_discard(pctx->req->get_offset(), pctx->req->get_length(), c);
198 break;
199 default:
200 derr << pctx << ": invalid request command: " << pctx->req->get_cmd()
201 << dendl;
202 c->release();
203 Mutex::Locker locker(m_lock);
204 m_stopping = true;
205 m_cond.Signal();
206 return;
207 }
208 }
209 dout(20) << "terminated" << dendl;
210 }
211
212 void Server::writer_entry() {
213 dout(20) << dendl;
214
215 while (!m_stopping) {
216 dout(20) << "waiting for io request" << dendl;
217
218 ceph::unique_ptr<IOContext> ctx(wait_io_finish());
219 if (!ctx) {
220 dout(20) << "no io requests, terminating" << dendl;
221 return;
222 }
223
224 dout(20) << ctx.get() << ": got: " << *ctx << dendl;
225
226 int r = m_drv->send(ctx->req);
227 if (r < 0) {
228 derr << ctx.get() << ": send: " << cpp_strerror(r) << dendl;
229 Mutex::Locker locker(m_lock);
230 m_stopping = true;
231 m_cond.Signal();
232 return;
233 }
234 dout(20) << ctx.get() << " finish" << dendl;
235 }
236 dout(20) << "terminated" << dendl;
237 }
238
239 std::ostream &operator<<(std::ostream &os, const Server::IOContext &ctx) {
240
241 os << "[" << ctx.req->get_id();
242
243 switch (ctx.req->get_cmd())
244 {
245 case rbd::ggate::Request::Write:
246 os << " Write ";
247 break;
248 case rbd::ggate::Request::Read:
249 os << " Read ";
250 break;
251 case rbd::ggate::Request::Flush:
252 os << " Flush ";
253 break;
254 case rbd::ggate::Request::Discard:
255 os << " Discard ";
256 break;
257 default:
258 os << " Unknow(" << ctx.req->get_cmd() << ") ";
259 break;
260 }
261
262 os << ctx.req->get_offset() << "~" << ctx.req->get_length() << " "
263 << ctx.req->get_error() << "]";
264
265 return os;
266 }
267
268 } // namespace ggate
269 } // namespace rbd
270