1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 * Ceph - scalable distributed file system
5 * Copyright (C) 2015 XSky <haomai@xsky.com>
7 * Author: Haomai Wang <haomaiwang@gmail.com>
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
15 #ifndef CEPH_MSG_DPDKSTACK_H
16 #define CEPH_MSG_DPDKSTACK_H
20 #include "common/ceph_context.h"
21 #include "common/Tub.h"
23 #include "msg/async/Stack.h"
33 template <typename Protocol
>
34 class NativeConnectedSocketImpl
;
36 // DPDKServerSocketImpl
37 template <typename Protocol
>
38 class DPDKServerSocketImpl
: public ServerSocketImpl
{
39 typename
Protocol::listener _listener
;
41 DPDKServerSocketImpl(Protocol
& proto
, uint16_t port
, const SocketOptions
&opt
);
43 return _listener
.listen();
45 virtual int accept(ConnectedSocket
*s
, const SocketOptions
&opts
, entity_addr_t
*out
, Worker
*w
) override
;
46 virtual void abort_accept() override
;
47 virtual int fd() const override
{
48 return _listener
.fd();
52 // NativeConnectedSocketImpl
53 template <typename Protocol
>
54 class NativeConnectedSocketImpl
: public ConnectedSocketImpl
{
55 typename
Protocol::connection _conn
;
56 uint32_t _cur_frag
= 0;
57 uint32_t _cur_off
= 0;
59 Tub
<bufferptr
> _cache_ptr
;
62 explicit NativeConnectedSocketImpl(typename
Protocol::connection conn
)
63 : _conn(std::move(conn
)) {}
64 NativeConnectedSocketImpl(NativeConnectedSocketImpl
&&rhs
)
65 : _conn(std::move(rhs
._conn
)), _buf(std::move(rhs
.buf
)) {}
66 virtual int is_connected() override
{
67 return _conn
.is_connected();
70 virtual ssize_t
read(char *buf
, size_t len
) override
{
76 _cache_ptr
.construct();
77 r
= zero_copy_read(*_cache_ptr
);
85 if (_cache_ptr
->length() <= left
) {
86 _cache_ptr
->copy_out(0, _cache_ptr
->length(), buf
+off
);
87 left
-= _cache_ptr
->length();
88 off
+= _cache_ptr
->length();
91 _cache_ptr
->copy_out(0, left
, buf
+off
);
92 _cache_ptr
->set_offset(_cache_ptr
->offset() + left
);
93 _cache_ptr
->set_length(_cache_ptr
->length() - left
);
98 return len
- left
? len
- left
: -EAGAIN
;
101 virtual ssize_t
zero_copy_read(bufferptr
&data
) override
{
102 auto err
= _conn
.get_errno();
107 _buf
= std::move(_conn
.read());
112 fragment
&f
= _buf
->frag(_cur_frag
);
113 Packet p
= _buf
->share(_cur_off
, f
.size
);
114 auto del
= std::bind(
115 [](Packet
&p
) {}, std::move(p
));
116 data
= buffer::claim_buffer(
117 f
.size
, f
.base
, make_deleter(std::move(del
)));
118 if (++_cur_frag
== _buf
->nr_frags()) {
125 assert(data
.length());
126 return data
.length();
128 virtual ssize_t
send(bufferlist
&bl
, bool more
) override
{
129 auto err
= _conn
.get_errno();
133 size_t available
= _conn
.peek_sent_available();
134 if (available
== 0) {
138 std::vector
<fragment
> frags
;
139 std::list
<bufferptr
>::const_iterator pb
= bl
.buffers().begin();
140 uint64_t left_pbrs
= bl
.buffers().size();
143 while (len
< available
&& left_pbrs
--) {
144 seglen
= pb
->length();
145 if (len
+ seglen
> available
) {
146 // don't continue if we enough at least 1 fragment since no available
147 // space for next ptr.
150 seglen
= MIN(seglen
, available
);
153 frags
.push_back(fragment
{(char*)pb
->c_str(), seglen
});
157 if (len
!= bl
.length()) {
159 bl
.splice(0, len
, &swapped
);
160 auto del
= std::bind(
161 [](bufferlist
&bl
) {}, std::move(swapped
));
162 return _conn
.send(Packet(std::move(frags
), make_deleter(std::move(del
))));
164 auto del
= std::bind(
165 [](bufferlist
&bl
) {}, std::move(bl
));
167 return _conn
.send(Packet(std::move(frags
), make_deleter(std::move(del
))));
170 virtual void shutdown() override
{
173 // FIXME need to impl close
174 virtual void close() override
{
177 virtual int fd() const override
{
182 template <typename Protocol
>
183 DPDKServerSocketImpl
<Protocol
>::DPDKServerSocketImpl(
184 Protocol
& proto
, uint16_t port
, const SocketOptions
&opt
)
185 : _listener(proto
.listen(port
)) {}
187 template <typename Protocol
>
188 int DPDKServerSocketImpl
<Protocol
>::accept(ConnectedSocket
*s
, const SocketOptions
&options
, entity_addr_t
*out
, Worker
*w
) {
189 if (_listener
.get_errno() < 0)
190 return _listener
.get_errno();
191 auto c
= _listener
.accept();
196 *out
= c
->remote_addr();
197 std::unique_ptr
<NativeConnectedSocketImpl
<Protocol
>> csi(
198 new NativeConnectedSocketImpl
<Protocol
>(std::move(*c
)));
199 *s
= ConnectedSocket(std::move(csi
));
203 template <typename Protocol
>
204 void DPDKServerSocketImpl
<Protocol
>::abort_accept() {
205 _listener
.abort_accept();
208 class DPDKWorker
: public Worker
{
212 std::shared_ptr
<DPDKDevice
> _dev
;
214 Impl(CephContext
*cct
, unsigned i
, EventCenter
*c
, std::shared_ptr
<DPDKDevice
> dev
);
216 _dev
->unset_local_queue(id
);
219 std::unique_ptr
<Impl
> _impl
;
221 virtual void initialize();
222 void set_ipv4_packet_filter(ip_packet_filter
* filter
) {
223 _impl
->_inet
.set_packet_filter(filter
);
225 using tcp4
= tcp
<ipv4_traits
>;
228 explicit DPDKWorker(CephContext
*c
, unsigned i
): Worker(c
, i
) {}
229 virtual int listen(entity_addr_t
&addr
, const SocketOptions
&opts
, ServerSocket
*) override
;
230 virtual int connect(const entity_addr_t
&addr
, const SocketOptions
&opts
, ConnectedSocket
*socket
) override
;
231 void arp_learn(ethernet_address l2
, ipv4_address l3
) {
232 _impl
->_inet
.learn(l2
, l3
);
234 virtual void destroy() override
{
238 friend class DPDKServerSocketImpl
<tcp4
>;
241 class DPDKStack
: public NetworkStack
{
242 vector
<std::function
<void()> > funcs
;
244 explicit DPDKStack(CephContext
*cct
, const string
&t
): NetworkStack(cct
, t
) {
245 funcs
.resize(cct
->_conf
->ms_async_max_op_threads
);
247 virtual bool support_zero_copy_read() const override
{ return true; }
248 virtual bool support_local_listen_table() const { return true; }
250 virtual void spawn_worker(unsigned i
, std::function
<void ()> &&func
) override
;
251 virtual void join_worker(unsigned i
) override
{
252 dpdk::eal::execute_on_master([&]() {
253 rte_eal_wait_lcore(i
+1);