]>
git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_ggate/Server.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "common/debug.h"
5 #include "common/errno.h"
10 #define dout_context g_ceph_context
11 #define dout_subsys ceph_subsys_rbd
13 #define dout_prefix *_dout << "rbd::ggate::Server: " << this \
14 << " " << __func__ << ": "
19 Server::Server(Driver
*drv
, librbd::Image
& image
)
20 : m_drv(drv
), m_image(image
), m_lock("rbd::ggate::Server::m_lock"),
21 m_reader_thread(this, &Server::reader_entry
),
22 m_writer_thread(this, &Server::writer_entry
) {
31 dout(20) << "entering run loop" << dendl
;
34 Mutex::Locker
locker(m_lock
);
36 m_cond
.WaitInterval(m_lock
, utime_t(1, 0));
40 dout(20) << "exiting run loop" << dendl
;
48 m_reader_thread
.create("rbd_reader");
49 m_writer_thread
.create("rbd_writer");
57 Mutex::Locker
locker(m_lock
);
61 m_reader_thread
.join();
62 m_writer_thread
.join();
67 void Server::io_start(IOContext
*ctx
) {
68 dout(20) << ctx
<< dendl
;
70 Mutex::Locker
locker(m_lock
);
71 m_io_pending
.push_back(&ctx
->item
);
74 void Server::io_finish(IOContext
*ctx
) {
75 dout(20) << ctx
<< dendl
;
77 Mutex::Locker
locker(m_lock
);
78 assert(ctx
->item
.is_on_list());
80 ctx
->item
.remove_myself();
81 m_io_finished
.push_back(&ctx
->item
);
85 Server::IOContext
*Server::wait_io_finish() {
88 Mutex::Locker
locker(m_lock
);
90 while (m_io_finished
.empty() && !m_stopping
) {
94 if (m_io_finished
.empty()) {
98 IOContext
*ret
= m_io_finished
.front();
99 m_io_finished
.pop_front();
104 void Server::wait_clean() {
107 assert(!m_reader_thread
.is_started());
109 Mutex::Locker
locker(m_lock
);
111 while (!m_io_pending
.empty()) {
115 while (!m_io_finished
.empty()) {
116 ceph::unique_ptr
<IOContext
> free_ctx(m_io_finished
.front());
117 m_io_finished
.pop_front();
121 void Server::aio_callback(librbd::completion_t cb
, void *arg
) {
122 librbd::RBD::AioCompletion
*aio_completion
=
123 reinterpret_cast<librbd::RBD::AioCompletion
*>(cb
);
125 IOContext
*ctx
= reinterpret_cast<IOContext
*>(arg
);
126 int r
= aio_completion
->get_return_value();
128 ctx
->server
->handle_aio(ctx
, r
);
129 aio_completion
->release();
132 void Server::handle_aio(IOContext
*ctx
, int r
) {
133 dout(20) << ctx
<< ": r=" << r
<< dendl
;
136 // if shrinking an image, a pagecache writeback might reference
137 // extents outside of the range of the new image extents
138 dout(5) << "masking IO out-of-bounds error" << dendl
;
139 ctx
->req
->bl
.clear();
144 ctx
->req
->set_error(-r
);
145 } else if ((ctx
->req
->get_cmd() == Request::Read
) &&
146 r
!= static_cast<int>(ctx
->req
->get_length())) {
147 int pad_byte_count
= static_cast<int> (ctx
->req
->get_length()) - r
;
148 ctx
->req
->bl
.append_zero(pad_byte_count
);
149 dout(20) << ctx
<< ": pad byte count: " << pad_byte_count
<< dendl
;
150 ctx
->req
->set_error(0);
152 ctx
->req
->set_error(0);
157 void Server::reader_entry() {
160 while (!m_stopping
) {
161 ceph::unique_ptr
<IOContext
> ctx(new IOContext(this));
163 dout(20) << "waiting for ggate request" << dendl
;
165 int r
= m_drv
->recv(&ctx
->req
);
167 if (r
!= -ECANCELED
) {
168 derr
<< "recv: " << cpp_strerror(r
) << dendl
;
170 Mutex::Locker
locker(m_lock
);
176 IOContext
*pctx
= ctx
.release();
178 dout(20) << pctx
<< ": start: " << *pctx
<< dendl
;
181 librbd::RBD::AioCompletion
*c
=
182 new librbd::RBD::AioCompletion(pctx
, aio_callback
);
183 switch (pctx
->req
->get_cmd())
185 case rbd::ggate::Request::Write
:
186 m_image
.aio_write(pctx
->req
->get_offset(), pctx
->req
->get_length(),
189 case rbd::ggate::Request::Read
:
190 m_image
.aio_read(pctx
->req
->get_offset(), pctx
->req
->get_length(),
193 case rbd::ggate::Request::Flush
:
194 m_image
.aio_flush(c
);
196 case rbd::ggate::Request::Discard
:
197 m_image
.aio_discard(pctx
->req
->get_offset(), pctx
->req
->get_length(), c
);
200 derr
<< pctx
<< ": invalid request command: " << pctx
->req
->get_cmd()
203 Mutex::Locker
locker(m_lock
);
209 dout(20) << "terminated" << dendl
;
212 void Server::writer_entry() {
215 while (!m_stopping
) {
216 dout(20) << "waiting for io request" << dendl
;
218 ceph::unique_ptr
<IOContext
> ctx(wait_io_finish());
220 dout(20) << "no io requests, terminating" << dendl
;
224 dout(20) << ctx
.get() << ": got: " << *ctx
<< dendl
;
226 int r
= m_drv
->send(ctx
->req
);
228 derr
<< ctx
.get() << ": send: " << cpp_strerror(r
) << dendl
;
229 Mutex::Locker
locker(m_lock
);
234 dout(20) << ctx
.get() << " finish" << dendl
;
236 dout(20) << "terminated" << dendl
;
239 std::ostream
&operator<<(std::ostream
&os
, const Server::IOContext
&ctx
) {
241 os
<< "[" << ctx
.req
->get_id();
243 switch (ctx
.req
->get_cmd())
245 case rbd::ggate::Request::Write
:
248 case rbd::ggate::Request::Read
:
251 case rbd::ggate::Request::Flush
:
254 case rbd::ggate::Request::Discard
:
258 os
<< " Unknow(" << ctx
.req
->get_cmd() << ") ";
262 os
<< ctx
.req
->get_offset() << "~" << ctx
.req
->get_length() << " "
263 << ctx
.req
->get_error() << "]";