]>
Commit | Line | Data |
---|---|---|
d2e6a577 FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "common/debug.h" | |
5 | #include "common/errno.h" | |
6 | #include "Driver.h" | |
7 | #include "Server.h" | |
8 | #include "Request.h" | |
9 | ||
10 | #define dout_context g_ceph_context | |
11 | #define dout_subsys ceph_subsys_rbd | |
12 | #undef dout_prefix | |
13 | #define dout_prefix *_dout << "rbd::ggate::Server: " << this \ | |
14 | << " " << __func__ << ": " | |
15 | ||
16 | namespace rbd { | |
17 | namespace ggate { | |
18 | ||
19 | Server::Server(Driver *drv, librbd::Image& image) | |
20 | : m_drv(drv), m_image(image), m_lock("rbd::ggate::Server::m_lock"), | |
21 | m_reader_thread(this, &Server::reader_entry), | |
22 | m_writer_thread(this, &Server::writer_entry) { | |
23 | } | |
24 | ||
25 | void Server::run() { | |
26 | dout(10) << dendl; | |
27 | ||
28 | int r = start(); | |
11fdf7f2 | 29 | ceph_assert(r == 0); |
d2e6a577 FG |
30 | |
31 | dout(20) << "entering run loop" << dendl; | |
32 | ||
33 | { | |
34 | Mutex::Locker locker(m_lock); | |
35 | while (!m_stopping) { | |
36 | m_cond.WaitInterval(m_lock, utime_t(1, 0)); | |
37 | } | |
38 | } | |
39 | ||
40 | dout(20) << "exiting run loop" << dendl; | |
41 | ||
42 | stop(); | |
43 | } | |
44 | ||
45 | int Server::start() { | |
46 | dout(10) << dendl; | |
47 | ||
48 | m_reader_thread.create("rbd_reader"); | |
49 | m_writer_thread.create("rbd_writer"); | |
50 | return 0; | |
51 | } | |
52 | ||
53 | void Server::stop() { | |
54 | dout(10) << dendl; | |
55 | ||
56 | { | |
57 | Mutex::Locker locker(m_lock); | |
11fdf7f2 | 58 | ceph_assert(m_stopping); |
d2e6a577 FG |
59 | } |
60 | ||
61 | m_reader_thread.join(); | |
62 | m_writer_thread.join(); | |
63 | ||
64 | wait_clean(); | |
65 | } | |
66 | ||
67 | void Server::io_start(IOContext *ctx) { | |
68 | dout(20) << ctx << dendl; | |
69 | ||
70 | Mutex::Locker locker(m_lock); | |
71 | m_io_pending.push_back(&ctx->item); | |
72 | } | |
73 | ||
74 | void Server::io_finish(IOContext *ctx) { | |
75 | dout(20) << ctx << dendl; | |
76 | ||
77 | Mutex::Locker locker(m_lock); | |
11fdf7f2 | 78 | ceph_assert(ctx->item.is_on_list()); |
d2e6a577 FG |
79 | |
80 | ctx->item.remove_myself(); | |
81 | m_io_finished.push_back(&ctx->item); | |
82 | m_cond.Signal(); | |
83 | } | |
84 | ||
85 | Server::IOContext *Server::wait_io_finish() { | |
86 | dout(20) << dendl; | |
87 | ||
88 | Mutex::Locker locker(m_lock); | |
89 | ||
90 | while (m_io_finished.empty() && !m_stopping) { | |
91 | m_cond.Wait(m_lock); | |
92 | } | |
93 | ||
94 | if (m_io_finished.empty()) { | |
95 | return nullptr; | |
96 | } | |
97 | ||
98 | IOContext *ret = m_io_finished.front(); | |
99 | m_io_finished.pop_front(); | |
100 | ||
101 | return ret; | |
102 | } | |
103 | ||
104 | void Server::wait_clean() { | |
105 | dout(20) << dendl; | |
106 | ||
11fdf7f2 | 107 | ceph_assert(!m_reader_thread.is_started()); |
d2e6a577 FG |
108 | |
109 | Mutex::Locker locker(m_lock); | |
110 | ||
111 | while (!m_io_pending.empty()) { | |
112 | m_cond.Wait(m_lock); | |
113 | } | |
114 | ||
115 | while (!m_io_finished.empty()) { | |
11fdf7f2 | 116 | std::unique_ptr<IOContext> free_ctx(m_io_finished.front()); |
d2e6a577 FG |
117 | m_io_finished.pop_front(); |
118 | } | |
119 | } | |
120 | ||
121 | void Server::aio_callback(librbd::completion_t cb, void *arg) { | |
122 | librbd::RBD::AioCompletion *aio_completion = | |
123 | reinterpret_cast<librbd::RBD::AioCompletion*>(cb); | |
124 | ||
125 | IOContext *ctx = reinterpret_cast<IOContext *>(arg); | |
126 | int r = aio_completion->get_return_value(); | |
127 | ||
128 | ctx->server->handle_aio(ctx, r); | |
129 | aio_completion->release(); | |
130 | } | |
131 | ||
132 | void Server::handle_aio(IOContext *ctx, int r) { | |
133 | dout(20) << ctx << ": r=" << r << dendl; | |
134 | ||
135 | if (r == -EINVAL) { | |
136 | // if shrinking an image, a pagecache writeback might reference | |
137 | // extents outside of the range of the new image extents | |
138 | dout(5) << "masking IO out-of-bounds error" << dendl; | |
139 | ctx->req->bl.clear(); | |
140 | r = 0; | |
141 | } | |
142 | ||
143 | if (r < 0) { | |
144 | ctx->req->set_error(-r); | |
145 | } else if ((ctx->req->get_cmd() == Request::Read) && | |
146 | r != static_cast<int>(ctx->req->get_length())) { | |
147 | int pad_byte_count = static_cast<int> (ctx->req->get_length()) - r; | |
148 | ctx->req->bl.append_zero(pad_byte_count); | |
149 | dout(20) << ctx << ": pad byte count: " << pad_byte_count << dendl; | |
150 | ctx->req->set_error(0); | |
151 | } else { | |
152 | ctx->req->set_error(0); | |
153 | } | |
154 | io_finish(ctx); | |
155 | } | |
156 | ||
157 | void Server::reader_entry() { | |
158 | dout(20) << dendl; | |
159 | ||
160 | while (!m_stopping) { | |
11fdf7f2 | 161 | std::unique_ptr<IOContext> ctx(new IOContext(this)); |
d2e6a577 FG |
162 | |
163 | dout(20) << "waiting for ggate request" << dendl; | |
164 | ||
165 | int r = m_drv->recv(&ctx->req); | |
166 | if (r < 0) { | |
167 | if (r != -ECANCELED) { | |
168 | derr << "recv: " << cpp_strerror(r) << dendl; | |
169 | } | |
170 | Mutex::Locker locker(m_lock); | |
171 | m_stopping = true; | |
172 | m_cond.Signal(); | |
173 | return; | |
174 | } | |
175 | ||
176 | IOContext *pctx = ctx.release(); | |
177 | ||
178 | dout(20) << pctx << ": start: " << *pctx << dendl; | |
179 | ||
180 | io_start(pctx); | |
181 | librbd::RBD::AioCompletion *c = | |
182 | new librbd::RBD::AioCompletion(pctx, aio_callback); | |
183 | switch (pctx->req->get_cmd()) | |
184 | { | |
185 | case rbd::ggate::Request::Write: | |
186 | m_image.aio_write(pctx->req->get_offset(), pctx->req->get_length(), | |
187 | pctx->req->bl, c); | |
188 | break; | |
189 | case rbd::ggate::Request::Read: | |
190 | m_image.aio_read(pctx->req->get_offset(), pctx->req->get_length(), | |
191 | pctx->req->bl, c); | |
192 | break; | |
193 | case rbd::ggate::Request::Flush: | |
194 | m_image.aio_flush(c); | |
195 | break; | |
196 | case rbd::ggate::Request::Discard: | |
197 | m_image.aio_discard(pctx->req->get_offset(), pctx->req->get_length(), c); | |
198 | break; | |
199 | default: | |
200 | derr << pctx << ": invalid request command: " << pctx->req->get_cmd() | |
201 | << dendl; | |
202 | c->release(); | |
203 | Mutex::Locker locker(m_lock); | |
204 | m_stopping = true; | |
205 | m_cond.Signal(); | |
206 | return; | |
207 | } | |
208 | } | |
209 | dout(20) << "terminated" << dendl; | |
210 | } | |
211 | ||
212 | void Server::writer_entry() { | |
213 | dout(20) << dendl; | |
214 | ||
215 | while (!m_stopping) { | |
216 | dout(20) << "waiting for io request" << dendl; | |
217 | ||
11fdf7f2 | 218 | std::unique_ptr<IOContext> ctx(wait_io_finish()); |
d2e6a577 FG |
219 | if (!ctx) { |
220 | dout(20) << "no io requests, terminating" << dendl; | |
221 | return; | |
222 | } | |
223 | ||
224 | dout(20) << ctx.get() << ": got: " << *ctx << dendl; | |
225 | ||
226 | int r = m_drv->send(ctx->req); | |
227 | if (r < 0) { | |
228 | derr << ctx.get() << ": send: " << cpp_strerror(r) << dendl; | |
229 | Mutex::Locker locker(m_lock); | |
230 | m_stopping = true; | |
231 | m_cond.Signal(); | |
232 | return; | |
233 | } | |
234 | dout(20) << ctx.get() << " finish" << dendl; | |
235 | } | |
236 | dout(20) << "terminated" << dendl; | |
237 | } | |
238 | ||
239 | std::ostream &operator<<(std::ostream &os, const Server::IOContext &ctx) { | |
240 | ||
241 | os << "[" << ctx.req->get_id(); | |
242 | ||
243 | switch (ctx.req->get_cmd()) | |
244 | { | |
245 | case rbd::ggate::Request::Write: | |
246 | os << " Write "; | |
247 | break; | |
248 | case rbd::ggate::Request::Read: | |
249 | os << " Read "; | |
250 | break; | |
251 | case rbd::ggate::Request::Flush: | |
252 | os << " Flush "; | |
253 | break; | |
254 | case rbd::ggate::Request::Discard: | |
255 | os << " Discard "; | |
256 | break; | |
257 | default: | |
258 | os << " Unknow(" << ctx.req->get_cmd() << ") "; | |
259 | break; | |
260 | } | |
261 | ||
262 | os << ctx.req->get_offset() << "~" << ctx.req->get_length() << " " | |
263 | << ctx.req->get_error() << "]"; | |
264 | ||
265 | return os; | |
266 | } | |
267 | ||
268 | } // namespace ggate | |
269 | } // namespace rbd | |
270 |