]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/PosixStack.cc
update sources to 12.2.10
[ceph.git] / ceph / src / msg / async / PosixStack.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#include <sys/socket.h>
18#include <netinet/tcp.h>
19#include <netinet/in.h>
20#include <arpa/inet.h>
21#include <errno.h>
22
23#include <algorithm>
24
25#include "PosixStack.h"
26
27#include "include/buffer.h"
28#include "include/str_list.h"
7c673cae
FG
29#include "common/errno.h"
30#include "common/strtol.h"
31#include "common/dout.h"
7c673cae 32#include "common/simple_spin.h"
3efd9988 33#include "msg/Messenger.h"
91327a77 34#include "include/compat.h"
3efd9988 35#include "include/sock_compat.h"
7c673cae
FG
36
37#define dout_subsys ceph_subsys_ms
38#undef dout_prefix
39#define dout_prefix *_dout << "PosixStack "
40
41class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
42 NetHandler &handler;
43 int _fd;
44 entity_addr_t sa;
45 bool connected;
7c673cae
FG
46
47 public:
48 explicit PosixConnectedSocketImpl(NetHandler &h, const entity_addr_t &sa, int f, bool connected)
49 : handler(h), _fd(f), sa(sa), connected(connected) {}
50
51 int is_connected() override {
52 if (connected)
53 return 1;
54
55 int r = handler.reconnect(sa, _fd);
56 if (r == 0) {
57 connected = true;
58 return 1;
59 } else if (r < 0) {
60 return r;
61 } else {
62 return 0;
63 }
64 }
65
66 ssize_t zero_copy_read(bufferptr&) override {
67 return -EOPNOTSUPP;
68 }
69
70 ssize_t read(char *buf, size_t len) override {
71 ssize_t r = ::read(_fd, buf, len);
72 if (r < 0)
73 r = -errno;
74 return r;
75 }
76
7c673cae
FG
77 // return the sent length
78 // < 0 means error occured
79 static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more)
80 {
7c673cae
FG
81 size_t sent = 0;
82 while (1) {
3efd9988 83 MSGR_SIGPIPE_STOPPER;
7c673cae 84 ssize_t r;
7c673cae 85 r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
7c673cae
FG
86 if (r < 0) {
87 if (errno == EINTR) {
88 continue;
89 } else if (errno == EAGAIN) {
90 break;
91 }
92 return -errno;
93 }
94
95 sent += r;
96 if (len == sent) break;
97
98 while (r > 0) {
99 if (msg.msg_iov[0].iov_len <= (size_t)r) {
100 // drain this whole item
101 r -= msg.msg_iov[0].iov_len;
102 msg.msg_iov++;
103 msg.msg_iovlen--;
104 } else {
105 msg.msg_iov[0].iov_base = (char *)msg.msg_iov[0].iov_base + r;
106 msg.msg_iov[0].iov_len -= r;
107 break;
108 }
109 }
110 }
7c673cae
FG
111 return (ssize_t)sent;
112 }
113
114 ssize_t send(bufferlist &bl, bool more) override {
115 size_t sent_bytes = 0;
116 std::list<bufferptr>::const_iterator pb = bl.buffers().begin();
117 uint64_t left_pbrs = bl.buffers().size();
118 while (left_pbrs) {
119 struct msghdr msg;
120 struct iovec msgvec[IOV_MAX];
121 uint64_t size = MIN(left_pbrs, IOV_MAX);
122 left_pbrs -= size;
123 memset(&msg, 0, sizeof(msg));
124 msg.msg_iovlen = 0;
125 msg.msg_iov = msgvec;
126 unsigned msglen = 0;
127 while (size > 0) {
128 msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str());
129 msgvec[msg.msg_iovlen].iov_len = pb->length();
130 msg.msg_iovlen++;
131 msglen += pb->length();
132 ++pb;
133 size--;
134 }
135
136 ssize_t r = do_sendmsg(_fd, msg, msglen, left_pbrs || more);
137 if (r < 0)
138 return r;
139
140 // "r" is the remaining length
141 sent_bytes += r;
142 if (static_cast<unsigned>(r) < msglen)
143 break;
144 // only "r" == 0 continue
145 }
146
147 if (sent_bytes) {
148 bufferlist swapped;
149 if (sent_bytes < bl.length()) {
150 bl.splice(sent_bytes, bl.length()-sent_bytes, &swapped);
151 bl.swap(swapped);
152 } else {
153 bl.clear();
154 }
155 }
156
157 return static_cast<ssize_t>(sent_bytes);
158 }
159 void shutdown() override {
160 ::shutdown(_fd, SHUT_RDWR);
161 }
162 void close() override {
163 ::close(_fd);
164 }
165 int fd() const override {
166 return _fd;
167 }
168 friend class PosixServerSocketImpl;
169 friend class PosixNetworkStack;
170};
171
172class PosixServerSocketImpl : public ServerSocketImpl {
173 NetHandler &handler;
174 int _fd;
175
176 public:
177 explicit PosixServerSocketImpl(NetHandler &h, int f): handler(h), _fd(f) {}
178 int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
179 void abort_accept() override {
180 ::close(_fd);
181 }
182 int fd() const override {
183 return _fd;
184 }
185};
186
187int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
188 assert(sock);
189 sockaddr_storage ss;
190 socklen_t slen = sizeof(ss);
91327a77 191 int sd = accept_cloexec(_fd, (sockaddr*)&ss, &slen);
7c673cae
FG
192 if (sd < 0) {
193 return -errno;
194 }
195
7c673cae
FG
196 int r = handler.set_nonblock(sd);
197 if (r < 0) {
198 ::close(sd);
199 return -errno;
200 }
201
202 r = handler.set_socket_options(sd, opt.nodelay, opt.rcbuf_size);
203 if (r < 0) {
204 ::close(sd);
205 return -errno;
206 }
207
208 assert(NULL != out); //out should not be NULL in accept connection
209
210 out->set_sockaddr((sockaddr*)&ss);
211 handler.set_priority(sd, opt.priority, out->get_family());
212
213 std::unique_ptr<PosixConnectedSocketImpl> csi(new PosixConnectedSocketImpl(handler, *out, sd, true));
214 *sock = ConnectedSocket(std::move(csi));
215 return 0;
216}
217
218void PosixWorker::initialize()
219{
220}
221
222int PosixWorker::listen(entity_addr_t &sa, const SocketOptions &opt,
223 ServerSocket *sock)
224{
225 int listen_sd = net.create_socket(sa.get_family(), true);
226 if (listen_sd < 0) {
227 return -errno;
228 }
229
230 int r = net.set_nonblock(listen_sd);
231 if (r < 0) {
232 ::close(listen_sd);
233 return -errno;
234 }
235
7c673cae
FG
236 r = net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size);
237 if (r < 0) {
238 ::close(listen_sd);
239 return -errno;
240 }
241
242 r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len());
243 if (r < 0) {
244 r = -errno;
245 ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr()
246 << ": " << cpp_strerror(r) << dendl;
247 ::close(listen_sd);
248 return r;
249 }
250
224ce89b 251 r = ::listen(listen_sd, cct->_conf->ms_tcp_listen_backlog);
7c673cae
FG
252 if (r < 0) {
253 r = -errno;
254 lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(r) << dendl;
255 ::close(listen_sd);
256 return r;
257 }
258
259 *sock = ServerSocket(
260 std::unique_ptr<PosixServerSocketImpl>(
261 new PosixServerSocketImpl(net, listen_sd)));
262 return 0;
263}
264
265int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) {
266 int sd;
267
268 if (opts.nonblock) {
269 sd = net.nonblock_connect(addr, opts.connect_bind_addr);
270 } else {
271 sd = net.connect(addr, opts.connect_bind_addr);
272 }
273
274 if (sd < 0) {
275 return -errno;
276 }
277
278 net.set_priority(sd, opts.priority, addr.get_family());
279 *socket = ConnectedSocket(
280 std::unique_ptr<PosixConnectedSocketImpl>(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock)));
281 return 0;
282}
283
284PosixNetworkStack::PosixNetworkStack(CephContext *c, const string &t)
285 : NetworkStack(c, t)
286{
287 vector<string> corestrs;
288 get_str_vec(cct->_conf->ms_async_affinity_cores, corestrs);
289 for (auto & corestr : corestrs) {
290 string err;
291 int coreid = strict_strtol(corestr.c_str(), 10, &err);
292 if (err == "")
293 coreids.push_back(coreid);
294 else
295 lderr(cct) << __func__ << " failed to parse " << corestr << " in " << cct->_conf->ms_async_affinity_cores << dendl;
296 }
297}