]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | /* | |
3 | * Ceph - scalable distributed file system | |
4 | * | |
5 | * Copyright (C) 2015 XSky <haomai@xsky.com> | |
6 | * | |
7 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
8 | * | |
9 | * This is free software; you can redistribute it and/or | |
10 | * modify it under the terms of the GNU Lesser General Public | |
11 | * License version 2.1, as published by the Free Software | |
12 | * Foundation. See file COPYING. | |
13 | * | |
14 | */ | |
15 | #ifndef CEPH_MSG_DPDKSTACK_H | |
16 | #define CEPH_MSG_DPDKSTACK_H | |
17 | ||
18 | #include <functional> | |
20effc67 | 19 | #include <optional> |
7c673cae FG |
20 | |
21 | #include "common/ceph_context.h" | |
7c673cae FG |
22 | |
23 | #include "msg/async/Stack.h" | |
7c673cae FG |
24 | #include "net.h" |
25 | #include "const.h" | |
26 | #include "IP.h" | |
27 | #include "Packet.h" | |
20effc67 | 28 | #include "dpdk_rte.h" |
7c673cae FG |
29 | |
30 | class interface; | |
31 | ||
32 | template <typename Protocol> | |
33 | class NativeConnectedSocketImpl; | |
34 | ||
35 | // DPDKServerSocketImpl | |
36 | template <typename Protocol> | |
37 | class DPDKServerSocketImpl : public ServerSocketImpl { | |
38 | typename Protocol::listener _listener; | |
39 | public: | |
11fdf7f2 | 40 | DPDKServerSocketImpl(Protocol& proto, uint16_t port, const SocketOptions &opt, |
9f95a23c | 41 | int type, unsigned addr_slot); |
7c673cae FG |
42 | int listen() { |
43 | return _listener.listen(); | |
44 | } | |
45 | virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; | |
46 | virtual void abort_accept() override; | |
47 | virtual int fd() const override { | |
48 | return _listener.fd(); | |
49 | } | |
50 | }; | |
51 | ||
52 | // NativeConnectedSocketImpl | |
53 | template <typename Protocol> | |
54 | class NativeConnectedSocketImpl : public ConnectedSocketImpl { | |
55 | typename Protocol::connection _conn; | |
56 | uint32_t _cur_frag = 0; | |
57 | uint32_t _cur_off = 0; | |
20effc67 TL |
58 | std::optional<Packet> _buf; |
59 | std::optional<bufferptr> _cache_ptr; | |
7c673cae FG |
60 | |
61 | public: | |
62 | explicit NativeConnectedSocketImpl(typename Protocol::connection conn) | |
63 | : _conn(std::move(conn)) {} | |
64 | NativeConnectedSocketImpl(NativeConnectedSocketImpl &&rhs) | |
65 | : _conn(std::move(rhs._conn)), _buf(std::move(rhs.buf)) {} | |
66 | virtual int is_connected() override { | |
67 | return _conn.is_connected(); | |
68 | } | |
69 | ||
70 | virtual ssize_t read(char *buf, size_t len) override { | |
71 | size_t left = len; | |
72 | ssize_t r = 0; | |
73 | size_t off = 0; | |
74 | while (left > 0) { | |
75 | if (!_cache_ptr) { | |
20effc67 | 76 | _cache_ptr.emplace(); |
7c673cae FG |
77 | r = zero_copy_read(*_cache_ptr); |
78 | if (r <= 0) { | |
20effc67 | 79 | _cache_ptr.reset(); |
7c673cae FG |
80 | if (r == -EAGAIN) |
81 | break; | |
82 | return r; | |
83 | } | |
84 | } | |
85 | if (_cache_ptr->length() <= left) { | |
86 | _cache_ptr->copy_out(0, _cache_ptr->length(), buf+off); | |
87 | left -= _cache_ptr->length(); | |
88 | off += _cache_ptr->length(); | |
20effc67 | 89 | _cache_ptr.reset(); |
7c673cae FG |
90 | } else { |
91 | _cache_ptr->copy_out(0, left, buf+off); | |
92 | _cache_ptr->set_offset(_cache_ptr->offset() + left); | |
93 | _cache_ptr->set_length(_cache_ptr->length() - left); | |
94 | left = 0; | |
95 | break; | |
96 | } | |
97 | } | |
98 | return len - left ? len - left : -EAGAIN; | |
99 | } | |
100 | ||
9f95a23c TL |
101 | private: |
102 | ssize_t zero_copy_read(bufferptr &data) { | |
7c673cae FG |
103 | auto err = _conn.get_errno(); |
104 | if (err <= 0) | |
105 | return err; | |
106 | ||
107 | if (!_buf) { | |
108 | _buf = std::move(_conn.read()); | |
109 | if (!_buf) | |
110 | return -EAGAIN; | |
111 | } | |
112 | ||
113 | fragment &f = _buf->frag(_cur_frag); | |
114 | Packet p = _buf->share(_cur_off, f.size); | |
115 | auto del = std::bind( | |
116 | [](Packet &p) {}, std::move(p)); | |
117 | data = buffer::claim_buffer( | |
118 | f.size, f.base, make_deleter(std::move(del))); | |
119 | if (++_cur_frag == _buf->nr_frags()) { | |
120 | _cur_frag = 0; | |
121 | _cur_off = 0; | |
20effc67 | 122 | _buf.reset(); |
7c673cae FG |
123 | } else { |
124 | _cur_off += f.size; | |
125 | } | |
11fdf7f2 | 126 | ceph_assert(data.length()); |
7c673cae FG |
127 | return data.length(); |
128 | } | |
129 | virtual ssize_t send(bufferlist &bl, bool more) override { | |
130 | auto err = _conn.get_errno(); | |
131 | if (err < 0) | |
132 | return (ssize_t)err; | |
133 | ||
134 | size_t available = _conn.peek_sent_available(); | |
135 | if (available == 0) { | |
136 | return 0; | |
137 | } | |
138 | ||
139 | std::vector<fragment> frags; | |
9f95a23c | 140 | auto pb = bl.buffers().begin(); |
7c673cae FG |
141 | uint64_t len = 0; |
142 | uint64_t seglen = 0; | |
9f95a23c | 143 | while (len < available && pb != bl.buffers().end()) { |
7c673cae | 144 | seglen = pb->length(); |
9f95a23c TL |
145 | // Buffer length is zero, no need to send, so skip it |
146 | if (seglen == 0) { | |
147 | ++pb; | |
148 | continue; | |
149 | } | |
7c673cae FG |
150 | if (len + seglen > available) { |
151 | // don't continue if we enough at least 1 fragment since no available | |
152 | // space for next ptr. | |
153 | if (len > 0) | |
154 | break; | |
11fdf7f2 | 155 | seglen = std::min(seglen, available); |
7c673cae FG |
156 | } |
157 | len += seglen; | |
158 | frags.push_back(fragment{(char*)pb->c_str(), seglen}); | |
159 | ++pb; | |
160 | } | |
161 | ||
162 | if (len != bl.length()) { | |
163 | bufferlist swapped; | |
164 | bl.splice(0, len, &swapped); | |
165 | auto del = std::bind( | |
166 | [](bufferlist &bl) {}, std::move(swapped)); | |
167 | return _conn.send(Packet(std::move(frags), make_deleter(std::move(del)))); | |
168 | } else { | |
169 | auto del = std::bind( | |
170 | [](bufferlist &bl) {}, std::move(bl)); | |
171 | ||
172 | return _conn.send(Packet(std::move(frags), make_deleter(std::move(del)))); | |
173 | } | |
174 | } | |
9f95a23c TL |
175 | |
176 | public: | |
7c673cae FG |
177 | virtual void shutdown() override { |
178 | _conn.close_write(); | |
179 | } | |
180 | // FIXME need to impl close | |
181 | virtual void close() override { | |
182 | _conn.close_write(); | |
183 | } | |
184 | virtual int fd() const override { | |
185 | return _conn.fd(); | |
186 | } | |
187 | }; | |
188 | ||
189 | template <typename Protocol> | |
190 | DPDKServerSocketImpl<Protocol>::DPDKServerSocketImpl( | |
9f95a23c TL |
191 | Protocol& proto, uint16_t port, const SocketOptions &opt, |
192 | int type, unsigned addr_slot) | |
193 | : ServerSocketImpl(type, addr_slot), _listener(proto.listen(port)) {} | |
7c673cae FG |
194 | |
195 | template <typename Protocol> | |
196 | int DPDKServerSocketImpl<Protocol>::accept(ConnectedSocket *s, const SocketOptions &options, entity_addr_t *out, Worker *w) { | |
197 | if (_listener.get_errno() < 0) | |
198 | return _listener.get_errno(); | |
199 | auto c = _listener.accept(); | |
200 | if (!c) | |
201 | return -EAGAIN; | |
202 | ||
11fdf7f2 | 203 | if (out) { |
7c673cae | 204 | *out = c->remote_addr(); |
11fdf7f2 TL |
205 | out->set_type(addr_type); |
206 | } | |
7c673cae FG |
207 | std::unique_ptr<NativeConnectedSocketImpl<Protocol>> csi( |
208 | new NativeConnectedSocketImpl<Protocol>(std::move(*c))); | |
209 | *s = ConnectedSocket(std::move(csi)); | |
210 | return 0; | |
211 | } | |
212 | ||
213 | template <typename Protocol> | |
214 | void DPDKServerSocketImpl<Protocol>::abort_accept() { | |
215 | _listener.abort_accept(); | |
216 | } | |
217 | ||
218 | class DPDKWorker : public Worker { | |
219 | struct Impl { | |
220 | unsigned id; | |
221 | interface _netif; | |
222 | std::shared_ptr<DPDKDevice> _dev; | |
223 | ipv4 _inet; | |
224 | Impl(CephContext *cct, unsigned i, EventCenter *c, std::shared_ptr<DPDKDevice> dev); | |
11fdf7f2 | 225 | ~Impl(); |
7c673cae FG |
226 | }; |
227 | std::unique_ptr<Impl> _impl; | |
228 | ||
11fdf7f2 | 229 | virtual void initialize() override; |
7c673cae FG |
230 | void set_ipv4_packet_filter(ip_packet_filter* filter) { |
231 | _impl->_inet.set_packet_filter(filter); | |
232 | } | |
233 | using tcp4 = tcp<ipv4_traits>; | |
234 | ||
235 | public: | |
236 | explicit DPDKWorker(CephContext *c, unsigned i): Worker(c, i) {} | |
9f95a23c TL |
237 | virtual int listen(entity_addr_t &addr, unsigned addr_slot, |
238 | const SocketOptions &opts, ServerSocket *) override; | |
7c673cae FG |
239 | virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override; |
240 | void arp_learn(ethernet_address l2, ipv4_address l3) { | |
241 | _impl->_inet.learn(l2, l3); | |
242 | } | |
243 | virtual void destroy() override { | |
244 | _impl.reset(); | |
245 | } | |
246 | ||
247 | friend class DPDKServerSocketImpl<tcp4>; | |
248 | }; | |
249 | ||
20effc67 | 250 | using namespace dpdk; |
7c673cae | 251 | class DPDKStack : public NetworkStack { |
20effc67 | 252 | std::vector<std::function<void()> > funcs; |
f67539c2 TL |
253 | |
254 | virtual Worker* create_worker(CephContext *c, unsigned worker_id) override { | |
255 | return new DPDKWorker(c, worker_id); | |
256 | } | |
20effc67 | 257 | virtual void rename_thread(unsigned id) override {} |
f67539c2 | 258 | |
7c673cae | 259 | public: |
20effc67 TL |
260 | explicit DPDKStack(CephContext *cct): NetworkStack(cct), eal(cct) { |
261 | funcs.reserve(cct->_conf->ms_async_op_threads); | |
7c673cae | 262 | } |
11fdf7f2 | 263 | virtual bool support_local_listen_table() const override { return true; } |
7c673cae | 264 | |
20effc67 | 265 | virtual void spawn_worker(std::function<void ()> &&func) override; |
11fdf7f2 | 266 | virtual void join_worker(unsigned i) override; |
20effc67 TL |
267 | private: |
268 | dpdk::eal eal; | |
7c673cae FG |
269 | }; |
270 | ||
271 | #endif |