]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/dpdk/DPDKStack.h
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / msg / async / dpdk / DPDKStack.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 /*
3 * Ceph - scalable distributed file system
4 *
5 * Copyright (C) 2015 XSky <haomai@xsky.com>
6 *
7 * Author: Haomai Wang <haomaiwang@gmail.com>
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15 #ifndef CEPH_MSG_DPDKSTACK_H
16 #define CEPH_MSG_DPDKSTACK_H
17
18 #include <functional>
19
20 #include "common/ceph_context.h"
21 #include "common/Tub.h"
22
23 #include "msg/async/Stack.h"
24 #include "dpdk_rte.h"
25 #include "DPDK.h"
26 #include "net.h"
27 #include "const.h"
28 #include "IP.h"
29 #include "Packet.h"
30
31 class interface;
32
33 template <typename Protocol>
34 class NativeConnectedSocketImpl;
35
36 // DPDKServerSocketImpl
37 template <typename Protocol>
38 class DPDKServerSocketImpl : public ServerSocketImpl {
39 typename Protocol::listener _listener;
40 public:
41 DPDKServerSocketImpl(Protocol& proto, uint16_t port, const SocketOptions &opt);
42 int listen() {
43 return _listener.listen();
44 }
45 virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
46 virtual void abort_accept() override;
47 virtual int fd() const override {
48 return _listener.fd();
49 }
50 };
51
52 // NativeConnectedSocketImpl
53 template <typename Protocol>
54 class NativeConnectedSocketImpl : public ConnectedSocketImpl {
55 typename Protocol::connection _conn;
56 uint32_t _cur_frag = 0;
57 uint32_t _cur_off = 0;
58 Tub<Packet> _buf;
59 Tub<bufferptr> _cache_ptr;
60
61 public:
62 explicit NativeConnectedSocketImpl(typename Protocol::connection conn)
63 : _conn(std::move(conn)) {}
64 NativeConnectedSocketImpl(NativeConnectedSocketImpl &&rhs)
65 : _conn(std::move(rhs._conn)), _buf(std::move(rhs.buf)) {}
66 virtual int is_connected() override {
67 return _conn.is_connected();
68 }
69
70 virtual ssize_t read(char *buf, size_t len) override {
71 size_t left = len;
72 ssize_t r = 0;
73 size_t off = 0;
74 while (left > 0) {
75 if (!_cache_ptr) {
76 _cache_ptr.construct();
77 r = zero_copy_read(*_cache_ptr);
78 if (r <= 0) {
79 _cache_ptr.destroy();
80 if (r == -EAGAIN)
81 break;
82 return r;
83 }
84 }
85 if (_cache_ptr->length() <= left) {
86 _cache_ptr->copy_out(0, _cache_ptr->length(), buf+off);
87 left -= _cache_ptr->length();
88 off += _cache_ptr->length();
89 _cache_ptr.destroy();
90 } else {
91 _cache_ptr->copy_out(0, left, buf+off);
92 _cache_ptr->set_offset(_cache_ptr->offset() + left);
93 _cache_ptr->set_length(_cache_ptr->length() - left);
94 left = 0;
95 break;
96 }
97 }
98 return len - left ? len - left : -EAGAIN;
99 }
100
101 virtual ssize_t zero_copy_read(bufferptr &data) override {
102 auto err = _conn.get_errno();
103 if (err <= 0)
104 return err;
105
106 if (!_buf) {
107 _buf = std::move(_conn.read());
108 if (!_buf)
109 return -EAGAIN;
110 }
111
112 fragment &f = _buf->frag(_cur_frag);
113 Packet p = _buf->share(_cur_off, f.size);
114 auto del = std::bind(
115 [](Packet &p) {}, std::move(p));
116 data = buffer::claim_buffer(
117 f.size, f.base, make_deleter(std::move(del)));
118 if (++_cur_frag == _buf->nr_frags()) {
119 _cur_frag = 0;
120 _cur_off = 0;
121 _buf.destroy();
122 } else {
123 _cur_off += f.size;
124 }
125 assert(data.length());
126 return data.length();
127 }
128 virtual ssize_t send(bufferlist &bl, bool more) override {
129 auto err = _conn.get_errno();
130 if (err < 0)
131 return (ssize_t)err;
132
133 size_t available = _conn.peek_sent_available();
134 if (available == 0) {
135 return 0;
136 }
137
138 std::vector<fragment> frags;
139 std::list<bufferptr>::const_iterator pb = bl.buffers().begin();
140 uint64_t left_pbrs = bl.buffers().size();
141 uint64_t len = 0;
142 uint64_t seglen = 0;
143 while (len < available && left_pbrs--) {
144 seglen = pb->length();
145 if (len + seglen > available) {
146 // don't continue if we enough at least 1 fragment since no available
147 // space for next ptr.
148 if (len > 0)
149 break;
150 seglen = MIN(seglen, available);
151 }
152 len += seglen;
153 frags.push_back(fragment{(char*)pb->c_str(), seglen});
154 ++pb;
155 }
156
157 if (len != bl.length()) {
158 bufferlist swapped;
159 bl.splice(0, len, &swapped);
160 auto del = std::bind(
161 [](bufferlist &bl) {}, std::move(swapped));
162 return _conn.send(Packet(std::move(frags), make_deleter(std::move(del))));
163 } else {
164 auto del = std::bind(
165 [](bufferlist &bl) {}, std::move(bl));
166
167 return _conn.send(Packet(std::move(frags), make_deleter(std::move(del))));
168 }
169 }
170 virtual void shutdown() override {
171 _conn.close_write();
172 }
173 // FIXME need to impl close
174 virtual void close() override {
175 _conn.close_write();
176 }
177 virtual int fd() const override {
178 return _conn.fd();
179 }
180 };
181
182 template <typename Protocol>
183 DPDKServerSocketImpl<Protocol>::DPDKServerSocketImpl(
184 Protocol& proto, uint16_t port, const SocketOptions &opt)
185 : _listener(proto.listen(port)) {}
186
187 template <typename Protocol>
188 int DPDKServerSocketImpl<Protocol>::accept(ConnectedSocket *s, const SocketOptions &options, entity_addr_t *out, Worker *w) {
189 if (_listener.get_errno() < 0)
190 return _listener.get_errno();
191 auto c = _listener.accept();
192 if (!c)
193 return -EAGAIN;
194
195 if (out)
196 *out = c->remote_addr();
197 std::unique_ptr<NativeConnectedSocketImpl<Protocol>> csi(
198 new NativeConnectedSocketImpl<Protocol>(std::move(*c)));
199 *s = ConnectedSocket(std::move(csi));
200 return 0;
201 }
202
203 template <typename Protocol>
204 void DPDKServerSocketImpl<Protocol>::abort_accept() {
205 _listener.abort_accept();
206 }
207
208 class DPDKWorker : public Worker {
209 struct Impl {
210 unsigned id;
211 interface _netif;
212 std::shared_ptr<DPDKDevice> _dev;
213 ipv4 _inet;
214 Impl(CephContext *cct, unsigned i, EventCenter *c, std::shared_ptr<DPDKDevice> dev);
215 ~Impl() {
216 _dev->unset_local_queue(id);
217 }
218 };
219 std::unique_ptr<Impl> _impl;
220
221 virtual void initialize();
222 void set_ipv4_packet_filter(ip_packet_filter* filter) {
223 _impl->_inet.set_packet_filter(filter);
224 }
225 using tcp4 = tcp<ipv4_traits>;
226
227 public:
228 explicit DPDKWorker(CephContext *c, unsigned i): Worker(c, i) {}
229 virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override;
230 virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
231 void arp_learn(ethernet_address l2, ipv4_address l3) {
232 _impl->_inet.learn(l2, l3);
233 }
234 virtual void destroy() override {
235 _impl.reset();
236 }
237
238 friend class DPDKServerSocketImpl<tcp4>;
239 };
240
241 class DPDKStack : public NetworkStack {
242 vector<std::function<void()> > funcs;
243 public:
244 explicit DPDKStack(CephContext *cct, const string &t): NetworkStack(cct, t) {
245 funcs.resize(cct->_conf->ms_async_max_op_threads);
246 }
247 virtual bool support_zero_copy_read() const override { return true; }
248 virtual bool support_local_listen_table() const { return true; }
249
250 virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
251 virtual void join_worker(unsigned i) override {
252 dpdk::eal::execute_on_master([&]() {
253 rte_eal_wait_lcore(i+1);
254 });
255 }
256 };
257
258 #endif