]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/PosixStack.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / msg / async / PosixStack.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#include <sys/socket.h>
18#include <netinet/tcp.h>
19#include <netinet/in.h>
20#include <arpa/inet.h>
21#include <errno.h>
22
23#include <algorithm>
24
25#include "PosixStack.h"
26
27#include "include/buffer.h"
28#include "include/str_list.h"
7c673cae
FG
29#include "common/errno.h"
30#include "common/strtol.h"
31#include "common/dout.h"
3efd9988 32#include "msg/Messenger.h"
91327a77 33#include "include/compat.h"
3efd9988 34#include "include/sock_compat.h"
7c673cae
FG
35
36#define dout_subsys ceph_subsys_ms
37#undef dout_prefix
38#define dout_prefix *_dout << "PosixStack "
39
40class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
41 NetHandler &handler;
42 int _fd;
43 entity_addr_t sa;
44 bool connected;
7c673cae
FG
45
46 public:
47 explicit PosixConnectedSocketImpl(NetHandler &h, const entity_addr_t &sa, int f, bool connected)
48 : handler(h), _fd(f), sa(sa), connected(connected) {}
49
50 int is_connected() override {
51 if (connected)
52 return 1;
53
54 int r = handler.reconnect(sa, _fd);
55 if (r == 0) {
56 connected = true;
57 return 1;
58 } else if (r < 0) {
59 return r;
60 } else {
61 return 0;
62 }
63 }
64
65 ssize_t zero_copy_read(bufferptr&) override {
66 return -EOPNOTSUPP;
67 }
68
69 ssize_t read(char *buf, size_t len) override {
70 ssize_t r = ::read(_fd, buf, len);
71 if (r < 0)
72 r = -errno;
73 return r;
74 }
75
7c673cae 76 // return the sent length
11fdf7f2 77 // < 0 means error occurred
7c673cae
FG
78 static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more)
79 {
7c673cae
FG
80 size_t sent = 0;
81 while (1) {
3efd9988 82 MSGR_SIGPIPE_STOPPER;
7c673cae 83 ssize_t r;
7c673cae 84 r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
7c673cae
FG
85 if (r < 0) {
86 if (errno == EINTR) {
87 continue;
88 } else if (errno == EAGAIN) {
89 break;
90 }
91 return -errno;
92 }
93
94 sent += r;
95 if (len == sent) break;
96
97 while (r > 0) {
98 if (msg.msg_iov[0].iov_len <= (size_t)r) {
99 // drain this whole item
100 r -= msg.msg_iov[0].iov_len;
101 msg.msg_iov++;
102 msg.msg_iovlen--;
103 } else {
104 msg.msg_iov[0].iov_base = (char *)msg.msg_iov[0].iov_base + r;
105 msg.msg_iov[0].iov_len -= r;
106 break;
107 }
108 }
109 }
7c673cae
FG
110 return (ssize_t)sent;
111 }
112
113 ssize_t send(bufferlist &bl, bool more) override {
114 size_t sent_bytes = 0;
11fdf7f2
TL
115 auto pb = std::cbegin(bl.buffers());
116 uint64_t left_pbrs = std::size(bl.buffers());
7c673cae
FG
117 while (left_pbrs) {
118 struct msghdr msg;
119 struct iovec msgvec[IOV_MAX];
11fdf7f2 120 uint64_t size = std::min<uint64_t>(left_pbrs, IOV_MAX);
7c673cae
FG
121 left_pbrs -= size;
122 memset(&msg, 0, sizeof(msg));
11fdf7f2 123 msg.msg_iovlen = size;
7c673cae
FG
124 msg.msg_iov = msgvec;
125 unsigned msglen = 0;
11fdf7f2
TL
126 for (auto iov = msgvec; iov != msgvec + size; iov++) {
127 iov->iov_base = (void*)(pb->c_str());
128 iov->iov_len = pb->length();
129 msglen += pb->length();
130 ++pb;
7c673cae 131 }
7c673cae
FG
132 ssize_t r = do_sendmsg(_fd, msg, msglen, left_pbrs || more);
133 if (r < 0)
134 return r;
135
136 // "r" is the remaining length
137 sent_bytes += r;
138 if (static_cast<unsigned>(r) < msglen)
139 break;
140 // only "r" == 0 continue
141 }
142
143 if (sent_bytes) {
144 bufferlist swapped;
145 if (sent_bytes < bl.length()) {
146 bl.splice(sent_bytes, bl.length()-sent_bytes, &swapped);
147 bl.swap(swapped);
148 } else {
149 bl.clear();
150 }
151 }
152
153 return static_cast<ssize_t>(sent_bytes);
154 }
155 void shutdown() override {
156 ::shutdown(_fd, SHUT_RDWR);
157 }
158 void close() override {
159 ::close(_fd);
160 }
161 int fd() const override {
162 return _fd;
163 }
11fdf7f2
TL
164 int socket_fd() const override {
165 return _fd;
166 }
7c673cae
FG
167 friend class PosixServerSocketImpl;
168 friend class PosixNetworkStack;
169};
170
171class PosixServerSocketImpl : public ServerSocketImpl {
172 NetHandler &handler;
173 int _fd;
174
175 public:
11fdf7f2
TL
176 explicit PosixServerSocketImpl(NetHandler &h, int f,
177 const entity_addr_t& listen_addr, unsigned slot)
178 : ServerSocketImpl(listen_addr.get_type(), slot),
179 handler(h), _fd(f) {}
7c673cae
FG
180 int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
181 void abort_accept() override {
182 ::close(_fd);
183 }
184 int fd() const override {
185 return _fd;
186 }
187};
188
189int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
11fdf7f2 190 ceph_assert(sock);
7c673cae
FG
191 sockaddr_storage ss;
192 socklen_t slen = sizeof(ss);
91327a77 193 int sd = accept_cloexec(_fd, (sockaddr*)&ss, &slen);
7c673cae
FG
194 if (sd < 0) {
195 return -errno;
196 }
197
7c673cae
FG
198 int r = handler.set_nonblock(sd);
199 if (r < 0) {
200 ::close(sd);
201 return -errno;
202 }
203
204 r = handler.set_socket_options(sd, opt.nodelay, opt.rcbuf_size);
205 if (r < 0) {
206 ::close(sd);
207 return -errno;
208 }
209
11fdf7f2 210 ceph_assert(NULL != out); //out should not be NULL in accept connection
7c673cae 211
11fdf7f2 212 out->set_type(addr_type);
7c673cae
FG
213 out->set_sockaddr((sockaddr*)&ss);
214 handler.set_priority(sd, opt.priority, out->get_family());
215
216 std::unique_ptr<PosixConnectedSocketImpl> csi(new PosixConnectedSocketImpl(handler, *out, sd, true));
217 *sock = ConnectedSocket(std::move(csi));
218 return 0;
219}
220
221void PosixWorker::initialize()
222{
223}
224
11fdf7f2
TL
225int PosixWorker::listen(entity_addr_t &sa,
226 unsigned addr_slot,
227 const SocketOptions &opt,
7c673cae
FG
228 ServerSocket *sock)
229{
230 int listen_sd = net.create_socket(sa.get_family(), true);
231 if (listen_sd < 0) {
232 return -errno;
233 }
234
235 int r = net.set_nonblock(listen_sd);
236 if (r < 0) {
237 ::close(listen_sd);
238 return -errno;
239 }
240
7c673cae
FG
241 r = net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size);
242 if (r < 0) {
243 ::close(listen_sd);
244 return -errno;
245 }
246
247 r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len());
248 if (r < 0) {
249 r = -errno;
250 ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr()
251 << ": " << cpp_strerror(r) << dendl;
252 ::close(listen_sd);
253 return r;
254 }
255
224ce89b 256 r = ::listen(listen_sd, cct->_conf->ms_tcp_listen_backlog);
7c673cae
FG
257 if (r < 0) {
258 r = -errno;
259 lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(r) << dendl;
260 ::close(listen_sd);
261 return r;
262 }
263
264 *sock = ServerSocket(
265 std::unique_ptr<PosixServerSocketImpl>(
11fdf7f2 266 new PosixServerSocketImpl(net, listen_sd, sa, addr_slot)));
7c673cae
FG
267 return 0;
268}
269
270int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) {
271 int sd;
272
273 if (opts.nonblock) {
274 sd = net.nonblock_connect(addr, opts.connect_bind_addr);
275 } else {
276 sd = net.connect(addr, opts.connect_bind_addr);
277 }
278
279 if (sd < 0) {
280 return -errno;
281 }
282
283 net.set_priority(sd, opts.priority, addr.get_family());
284 *socket = ConnectedSocket(
285 std::unique_ptr<PosixConnectedSocketImpl>(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock)));
286 return 0;
287}
288
289PosixNetworkStack::PosixNetworkStack(CephContext *c, const string &t)
290 : NetworkStack(c, t)
291{
7c673cae 292}