]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright (C) 2014 Cloudius Systems, Ltd. | |
20 | */ | |
21 | ||
22 | #pragma once | |
23 | ||
11fdf7f2 | 24 | #include <seastar/core/sharded.hh> |
f67539c2 | 25 | #include <seastar/core/internal/pollable_fd.hh> |
11fdf7f2 | 26 | #include <seastar/net/stack.hh> |
9f95a23c | 27 | #include <seastar/core/polymorphic_temporary_buffer.hh> |
f67539c2 | 28 | #include <seastar/core/internal/buffer_allocator.hh> |
1e59de90 | 29 | #include <seastar/util/program-options.hh> |
11fdf7f2 TL |
30 | |
31 | namespace seastar { | |
32 | ||
33 | namespace net { | |
34 | ||
35 | using namespace seastar; | |
36 | ||
37 | // We can't keep this in any of the socket servers as instance members, because a connection can | |
38 | // outlive the socket server. To avoid having the whole socket_server tracked as a shared pointer, | |
39 | // we will have a conntrack structure. | |
40 | // | |
41 | // Right now this class is used by the posix_server_socket_impl, but it could be used by any other. | |
42 | class conntrack { | |
43 | class load_balancer { | |
44 | std::vector<unsigned> _cpu_load; | |
45 | public: | |
46 | load_balancer() : _cpu_load(size_t(smp::count), 0) {} | |
47 | void closed_cpu(shard_id cpu) { | |
48 | _cpu_load[cpu]--; | |
49 | } | |
50 | shard_id next_cpu() { | |
51 | // FIXME: The naive algorithm will just round robin the connections around the shards. | |
52 | // A more complex version can keep track of the amount of activity in each connection, | |
53 | // and use that information. | |
54 | auto min_el = std::min_element(_cpu_load.begin(), _cpu_load.end()); | |
55 | auto cpu = shard_id(std::distance(_cpu_load.begin(), min_el)); | |
56 | _cpu_load[cpu]++; | |
57 | return cpu; | |
58 | } | |
59 | shard_id force_cpu(shard_id cpu) { | |
60 | _cpu_load[cpu]++; | |
61 | return cpu; | |
62 | } | |
63 | }; | |
64 | ||
65 | lw_shared_ptr<load_balancer> _lb; | |
66 | void closed_cpu(shard_id cpu) { | |
67 | _lb->closed_cpu(cpu); | |
68 | } | |
69 | public: | |
70 | class handle { | |
71 | shard_id _host_cpu; | |
72 | shard_id _target_cpu; | |
73 | foreign_ptr<lw_shared_ptr<load_balancer>> _lb; | |
74 | public: | |
75 | handle() : _lb(nullptr) {} | |
76 | handle(shard_id cpu, lw_shared_ptr<load_balancer> lb) | |
f67539c2 | 77 | : _host_cpu(this_shard_id()) |
11fdf7f2 TL |
78 | , _target_cpu(cpu) |
79 | , _lb(make_foreign(std::move(lb))) {} | |
80 | ||
81 | handle(const handle&) = delete; | |
82 | handle(handle&&) = default; | |
83 | ~handle() { | |
84 | if (!_lb) { | |
85 | return; | |
86 | } | |
9f95a23c TL |
87 | // FIXME: future is discarded |
88 | (void)smp::submit_to(_host_cpu, [cpu = _target_cpu, lb = std::move(_lb)] { | |
11fdf7f2 TL |
89 | lb->closed_cpu(cpu); |
90 | }); | |
91 | } | |
92 | shard_id cpu() { | |
93 | return _target_cpu; | |
94 | } | |
95 | }; | |
96 | friend class handle; | |
97 | ||
98 | conntrack() : _lb(make_lw_shared<load_balancer>()) {} | |
99 | handle get_handle() { | |
100 | return handle(_lb->next_cpu(), _lb); | |
101 | } | |
102 | handle get_handle(shard_id cpu) { | |
103 | return handle(_lb->force_cpu(cpu), _lb); | |
104 | } | |
105 | }; | |
106 | ||
f67539c2 TL |
107 | class posix_data_source_impl final : public data_source_impl, private internal::buffer_allocator { |
108 | std::pmr::polymorphic_allocator<char>* _buffer_allocator; | |
109 | pollable_fd _fd; | |
110 | connected_socket_input_stream_config _config; | |
111 | private: | |
112 | virtual temporary_buffer<char> allocate_buffer() override; | |
11fdf7f2 | 113 | public: |
f67539c2 TL |
114 | explicit posix_data_source_impl(pollable_fd fd, connected_socket_input_stream_config config, |
115 | std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator) | |
116 | : _buffer_allocator(allocator), _fd(std::move(fd)), _config(config) { | |
117 | } | |
11fdf7f2 TL |
118 | future<temporary_buffer<char>> get() override; |
119 | future<> close() override; | |
120 | }; | |
121 | ||
122 | class posix_data_sink_impl : public data_sink_impl { | |
f67539c2 | 123 | pollable_fd _fd; |
11fdf7f2 TL |
124 | packet _p; |
125 | public: | |
f67539c2 | 126 | explicit posix_data_sink_impl(pollable_fd fd) : _fd(std::move(fd)) {} |
11fdf7f2 TL |
127 | using data_sink_impl::put; |
128 | future<> put(packet p) override; | |
129 | future<> put(temporary_buffer<char> buf) override; | |
130 | future<> close() override; | |
131 | }; | |
132 | ||
f67539c2 | 133 | class posix_ap_server_socket_impl : public server_socket_impl { |
9f95a23c | 134 | using protocol_and_socket_address = std::tuple<int, socket_address>; |
11fdf7f2 TL |
135 | struct connection { |
136 | pollable_fd fd; | |
137 | socket_address addr; | |
9f95a23c TL |
138 | conntrack::handle connection_tracking_handle; |
139 | connection(pollable_fd xfd, socket_address xaddr, conntrack::handle cth) : fd(std::move(xfd)), addr(xaddr), connection_tracking_handle(std::move(cth)) {} | |
11fdf7f2 | 140 | }; |
9f95a23c TL |
141 | using sockets_map_t = std::unordered_map<protocol_and_socket_address, promise<accept_result>>; |
142 | using conn_map_t = std::unordered_multimap<protocol_and_socket_address, connection>; | |
143 | static thread_local sockets_map_t sockets; | |
144 | static thread_local conn_map_t conn_q; | |
145 | int _protocol; | |
11fdf7f2 | 146 | socket_address _sa; |
f67539c2 | 147 | std::pmr::polymorphic_allocator<char>* _allocator; |
11fdf7f2 | 148 | public: |
f67539c2 | 149 | explicit posix_ap_server_socket_impl(int protocol, socket_address sa, std::pmr::polymorphic_allocator<char>* allocator = memory::malloc_allocator) : _protocol(protocol), _sa(sa), _allocator(allocator) {} |
9f95a23c | 150 | virtual future<accept_result> accept() override; |
11fdf7f2 | 151 | virtual void abort_accept() override; |
9f95a23c TL |
152 | socket_address local_address() const override { |
153 | return _sa; | |
154 | } | |
f67539c2 | 155 | static void move_connected_socket(int protocol, socket_address sa, pollable_fd fd, socket_address addr, conntrack::handle handle, std::pmr::polymorphic_allocator<char>* allocator); |
9f95a23c TL |
156 | |
157 | template <typename T> | |
158 | friend class std::hash; | |
11fdf7f2 | 159 | }; |
11fdf7f2 | 160 | |
f67539c2 | 161 | class posix_server_socket_impl : public server_socket_impl { |
11fdf7f2 | 162 | socket_address _sa; |
9f95a23c | 163 | int _protocol; |
11fdf7f2 TL |
164 | pollable_fd _lfd; |
165 | conntrack _conntrack; | |
166 | server_socket::load_balancing_algorithm _lba; | |
9f95a23c | 167 | shard_id _fixed_cpu; |
f67539c2 | 168 | std::pmr::polymorphic_allocator<char>* _allocator; |
11fdf7f2 | 169 | public: |
9f95a23c TL |
170 | explicit posix_server_socket_impl(int protocol, socket_address sa, pollable_fd lfd, |
171 | server_socket::load_balancing_algorithm lba, shard_id fixed_cpu, | |
f67539c2 | 172 | std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator) : _sa(sa), _protocol(protocol), _lfd(std::move(lfd)), _lba(lba), _fixed_cpu(fixed_cpu), _allocator(allocator) {} |
9f95a23c | 173 | virtual future<accept_result> accept() override; |
11fdf7f2 | 174 | virtual void abort_accept() override; |
9f95a23c | 175 | virtual socket_address local_address() const override; |
11fdf7f2 | 176 | }; |
11fdf7f2 | 177 | |
f67539c2 | 178 | class posix_reuseport_server_socket_impl : public server_socket_impl { |
11fdf7f2 | 179 | socket_address _sa; |
9f95a23c | 180 | int _protocol; |
11fdf7f2 | 181 | pollable_fd _lfd; |
f67539c2 | 182 | std::pmr::polymorphic_allocator<char>* _allocator; |
11fdf7f2 | 183 | public: |
9f95a23c | 184 | explicit posix_reuseport_server_socket_impl(int protocol, socket_address sa, pollable_fd lfd, |
f67539c2 | 185 | std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator) : _sa(sa), _protocol(protocol), _lfd(std::move(lfd)), _allocator(allocator) {} |
9f95a23c | 186 | virtual future<accept_result> accept() override; |
11fdf7f2 | 187 | virtual void abort_accept() override; |
9f95a23c | 188 | virtual socket_address local_address() const override; |
11fdf7f2 | 189 | }; |
11fdf7f2 TL |
190 | |
191 | class posix_network_stack : public network_stack { | |
192 | private: | |
193 | const bool _reuseport; | |
9f95a23c | 194 | protected: |
f67539c2 | 195 | std::pmr::polymorphic_allocator<char>* _allocator; |
11fdf7f2 | 196 | public: |
20effc67 | 197 | explicit posix_network_stack(const program_options::option_group& opts, std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator); |
11fdf7f2 TL |
198 | virtual server_socket listen(socket_address sa, listen_options opts) override; |
199 | virtual ::seastar::socket socket() override; | |
9f95a23c | 200 | virtual net::udp_channel make_udp_channel(const socket_address&) override; |
20effc67 | 201 | static future<std::unique_ptr<network_stack>> create(const program_options::option_group& opts, std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator) { |
9f95a23c | 202 | return make_ready_future<std::unique_ptr<network_stack>>(std::unique_ptr<network_stack>(new posix_network_stack(opts, allocator))); |
11fdf7f2 TL |
203 | } |
204 | virtual bool has_per_core_namespace() override { return _reuseport; }; | |
9f95a23c TL |
205 | bool supports_ipv6() const override; |
206 | std::vector<network_interface> network_interfaces() override; | |
11fdf7f2 TL |
207 | }; |
208 | ||
209 | class posix_ap_network_stack : public posix_network_stack { | |
210 | private: | |
211 | const bool _reuseport; | |
212 | public: | |
20effc67 | 213 | posix_ap_network_stack(const program_options::option_group& opts, std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator); |
11fdf7f2 | 214 | virtual server_socket listen(socket_address sa, listen_options opts) override; |
20effc67 | 215 | static future<std::unique_ptr<network_stack>> create(const program_options::option_group& opts, std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator) { |
9f95a23c | 216 | return make_ready_future<std::unique_ptr<network_stack>>(std::unique_ptr<network_stack>(new posix_ap_network_stack(opts, allocator))); |
11fdf7f2 TL |
217 | } |
218 | }; | |
219 | ||
20effc67 | 220 | network_stack_entry register_posix_stack(); |
11fdf7f2 TL |
221 | } |
222 | ||
223 | } |