]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/dpdk/DPDKStack.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / msg / async / dpdk / DPDKStack.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2/*
3 * Ceph - scalable distributed file system
4 *
5 * Copyright (C) 2015 XSky <haomai@xsky.com>
6 *
7 * Author: Haomai Wang <haomaiwang@gmail.com>
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15#ifndef CEPH_MSG_DPDKSTACK_H
16#define CEPH_MSG_DPDKSTACK_H
17
18#include <functional>
20effc67 19#include <optional>
7c673cae
FG
20
21#include "common/ceph_context.h"
7c673cae
FG
22
23#include "msg/async/Stack.h"
7c673cae
FG
24#include "net.h"
25#include "const.h"
26#include "IP.h"
27#include "Packet.h"
20effc67 28#include "dpdk_rte.h"
7c673cae
FG
29
30class interface;
31
32template <typename Protocol>
33class NativeConnectedSocketImpl;
34
35// DPDKServerSocketImpl
36template <typename Protocol>
37class DPDKServerSocketImpl : public ServerSocketImpl {
38 typename Protocol::listener _listener;
39 public:
11fdf7f2 40 DPDKServerSocketImpl(Protocol& proto, uint16_t port, const SocketOptions &opt,
9f95a23c 41 int type, unsigned addr_slot);
7c673cae
FG
42 int listen() {
43 return _listener.listen();
44 }
45 virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
46 virtual void abort_accept() override;
47 virtual int fd() const override {
48 return _listener.fd();
49 }
50};
51
52// NativeConnectedSocketImpl
53template <typename Protocol>
54class NativeConnectedSocketImpl : public ConnectedSocketImpl {
55 typename Protocol::connection _conn;
56 uint32_t _cur_frag = 0;
57 uint32_t _cur_off = 0;
20effc67
TL
58 std::optional<Packet> _buf;
59 std::optional<bufferptr> _cache_ptr;
7c673cae
FG
60
61 public:
62 explicit NativeConnectedSocketImpl(typename Protocol::connection conn)
63 : _conn(std::move(conn)) {}
64 NativeConnectedSocketImpl(NativeConnectedSocketImpl &&rhs)
65 : _conn(std::move(rhs._conn)), _buf(std::move(rhs.buf)) {}
66 virtual int is_connected() override {
67 return _conn.is_connected();
68 }
69
70 virtual ssize_t read(char *buf, size_t len) override {
71 size_t left = len;
72 ssize_t r = 0;
73 size_t off = 0;
74 while (left > 0) {
75 if (!_cache_ptr) {
20effc67 76 _cache_ptr.emplace();
7c673cae
FG
77 r = zero_copy_read(*_cache_ptr);
78 if (r <= 0) {
20effc67 79 _cache_ptr.reset();
7c673cae
FG
80 if (r == -EAGAIN)
81 break;
82 return r;
83 }
84 }
85 if (_cache_ptr->length() <= left) {
86 _cache_ptr->copy_out(0, _cache_ptr->length(), buf+off);
87 left -= _cache_ptr->length();
88 off += _cache_ptr->length();
20effc67 89 _cache_ptr.reset();
7c673cae
FG
90 } else {
91 _cache_ptr->copy_out(0, left, buf+off);
92 _cache_ptr->set_offset(_cache_ptr->offset() + left);
93 _cache_ptr->set_length(_cache_ptr->length() - left);
94 left = 0;
95 break;
96 }
97 }
98 return len - left ? len - left : -EAGAIN;
99 }
100
9f95a23c
TL
101private:
102 ssize_t zero_copy_read(bufferptr &data) {
7c673cae
FG
103 auto err = _conn.get_errno();
104 if (err <= 0)
105 return err;
106
107 if (!_buf) {
108 _buf = std::move(_conn.read());
109 if (!_buf)
110 return -EAGAIN;
111 }
112
113 fragment &f = _buf->frag(_cur_frag);
114 Packet p = _buf->share(_cur_off, f.size);
115 auto del = std::bind(
116 [](Packet &p) {}, std::move(p));
117 data = buffer::claim_buffer(
118 f.size, f.base, make_deleter(std::move(del)));
119 if (++_cur_frag == _buf->nr_frags()) {
120 _cur_frag = 0;
121 _cur_off = 0;
20effc67 122 _buf.reset();
7c673cae
FG
123 } else {
124 _cur_off += f.size;
125 }
11fdf7f2 126 ceph_assert(data.length());
7c673cae
FG
127 return data.length();
128 }
129 virtual ssize_t send(bufferlist &bl, bool more) override {
130 auto err = _conn.get_errno();
131 if (err < 0)
132 return (ssize_t)err;
133
134 size_t available = _conn.peek_sent_available();
135 if (available == 0) {
136 return 0;
137 }
138
139 std::vector<fragment> frags;
9f95a23c 140 auto pb = bl.buffers().begin();
7c673cae
FG
141 uint64_t len = 0;
142 uint64_t seglen = 0;
9f95a23c 143 while (len < available && pb != bl.buffers().end()) {
7c673cae 144 seglen = pb->length();
9f95a23c
TL
145 // Buffer length is zero, no need to send, so skip it
146 if (seglen == 0) {
147 ++pb;
148 continue;
149 }
7c673cae
FG
150 if (len + seglen > available) {
151 // don't continue if we enough at least 1 fragment since no available
152 // space for next ptr.
153 if (len > 0)
154 break;
11fdf7f2 155 seglen = std::min(seglen, available);
7c673cae
FG
156 }
157 len += seglen;
158 frags.push_back(fragment{(char*)pb->c_str(), seglen});
159 ++pb;
160 }
161
162 if (len != bl.length()) {
163 bufferlist swapped;
164 bl.splice(0, len, &swapped);
165 auto del = std::bind(
166 [](bufferlist &bl) {}, std::move(swapped));
167 return _conn.send(Packet(std::move(frags), make_deleter(std::move(del))));
168 } else {
169 auto del = std::bind(
170 [](bufferlist &bl) {}, std::move(bl));
171
172 return _conn.send(Packet(std::move(frags), make_deleter(std::move(del))));
173 }
174 }
9f95a23c
TL
175
176public:
7c673cae
FG
177 virtual void shutdown() override {
178 _conn.close_write();
179 }
180 // FIXME need to impl close
181 virtual void close() override {
182 _conn.close_write();
183 }
184 virtual int fd() const override {
185 return _conn.fd();
186 }
187};
188
189template <typename Protocol>
190DPDKServerSocketImpl<Protocol>::DPDKServerSocketImpl(
9f95a23c
TL
191 Protocol& proto, uint16_t port, const SocketOptions &opt,
192 int type, unsigned addr_slot)
193 : ServerSocketImpl(type, addr_slot), _listener(proto.listen(port)) {}
7c673cae
FG
194
195template <typename Protocol>
196int DPDKServerSocketImpl<Protocol>::accept(ConnectedSocket *s, const SocketOptions &options, entity_addr_t *out, Worker *w) {
197 if (_listener.get_errno() < 0)
198 return _listener.get_errno();
199 auto c = _listener.accept();
200 if (!c)
201 return -EAGAIN;
202
11fdf7f2 203 if (out) {
7c673cae 204 *out = c->remote_addr();
11fdf7f2
TL
205 out->set_type(addr_type);
206 }
7c673cae
FG
207 std::unique_ptr<NativeConnectedSocketImpl<Protocol>> csi(
208 new NativeConnectedSocketImpl<Protocol>(std::move(*c)));
209 *s = ConnectedSocket(std::move(csi));
210 return 0;
211}
212
213template <typename Protocol>
214void DPDKServerSocketImpl<Protocol>::abort_accept() {
215 _listener.abort_accept();
216}
217
218class DPDKWorker : public Worker {
219 struct Impl {
220 unsigned id;
221 interface _netif;
222 std::shared_ptr<DPDKDevice> _dev;
223 ipv4 _inet;
224 Impl(CephContext *cct, unsigned i, EventCenter *c, std::shared_ptr<DPDKDevice> dev);
11fdf7f2 225 ~Impl();
7c673cae
FG
226 };
227 std::unique_ptr<Impl> _impl;
228
11fdf7f2 229 virtual void initialize() override;
7c673cae
FG
230 void set_ipv4_packet_filter(ip_packet_filter* filter) {
231 _impl->_inet.set_packet_filter(filter);
232 }
233 using tcp4 = tcp<ipv4_traits>;
234
235 public:
236 explicit DPDKWorker(CephContext *c, unsigned i): Worker(c, i) {}
9f95a23c
TL
237 virtual int listen(entity_addr_t &addr, unsigned addr_slot,
238 const SocketOptions &opts, ServerSocket *) override;
7c673cae
FG
239 virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
240 void arp_learn(ethernet_address l2, ipv4_address l3) {
241 _impl->_inet.learn(l2, l3);
242 }
243 virtual void destroy() override {
244 _impl.reset();
245 }
246
247 friend class DPDKServerSocketImpl<tcp4>;
248};
249
20effc67 250using namespace dpdk;
7c673cae 251class DPDKStack : public NetworkStack {
20effc67 252 std::vector<std::function<void()> > funcs;
f67539c2
TL
253
254 virtual Worker* create_worker(CephContext *c, unsigned worker_id) override {
255 return new DPDKWorker(c, worker_id);
256 }
20effc67 257 virtual void rename_thread(unsigned id) override {}
f67539c2 258
7c673cae 259 public:
20effc67
TL
260 explicit DPDKStack(CephContext *cct): NetworkStack(cct), eal(cct) {
261 funcs.reserve(cct->_conf->ms_async_op_threads);
7c673cae 262 }
11fdf7f2 263 virtual bool support_local_listen_table() const override { return true; }
7c673cae 264
20effc67 265 virtual void spawn_worker(std::function<void ()> &&func) override;
11fdf7f2 266 virtual void join_worker(unsigned i) override;
20effc67
TL
267 private:
268 dpdk::eal eal;
7c673cae
FG
269};
270
271#endif