]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright 2019 ScyllaDB | |
20 | */ | |
21 | ||
22 | #pragma once | |
23 | ||
24 | #include <seastar/core/future.hh> | |
25 | #include <seastar/core/posix.hh> | |
26 | #include <seastar/core/internal/pollable_fd.hh> | |
27 | #include <seastar/core/internal/poll.hh> | |
28 | #include <seastar/core/linux-aio.hh> | |
29 | #include <seastar/core/cacheline.hh> | |
30 | #include <sys/time.h> | |
31 | #include <signal.h> | |
32 | #include <thread> | |
33 | #include <stack> | |
34 | #include <boost/any.hpp> | |
35 | #include <boost/program_options.hpp> | |
36 | #include <boost/container/static_vector.hpp> | |
37 | ||
38 | #ifdef HAVE_OSV | |
39 | #include <osv/newpoll.hh> | |
40 | #endif | |
41 | ||
42 | namespace seastar { | |
43 | ||
44 | class reactor; | |
45 | ||
46 | // FIXME: merge it with storage context below. At this point the | |
47 | // main thing to do is unify the iocb list | |
48 | struct aio_general_context { | |
49 | explicit aio_general_context(size_t nr); | |
50 | ~aio_general_context(); | |
51 | internal::linux_abi::aio_context_t io_context{}; | |
52 | std::unique_ptr<internal::linux_abi::iocb*[]> iocbs; | |
53 | internal::linux_abi::iocb** last = iocbs.get(); | |
54 | void queue(internal::linux_abi::iocb* iocb); | |
55 | size_t flush(); | |
56 | }; | |
57 | ||
58 | class aio_storage_context { | |
59 | static constexpr unsigned max_aio = 1024; | |
60 | ||
61 | class iocb_pool { | |
62 | alignas(cache_line_size) std::array<internal::linux_abi::iocb, max_aio> _iocb_pool; | |
63 | std::stack<internal::linux_abi::iocb*, boost::container::static_vector<internal::linux_abi::iocb*, max_aio>> _free_iocbs; | |
64 | public: | |
65 | iocb_pool(); | |
66 | internal::linux_abi::iocb& get_one(); | |
67 | void put_one(internal::linux_abi::iocb* io); | |
68 | unsigned outstanding() const; | |
69 | bool has_capacity() const; | |
70 | }; | |
71 | ||
72 | reactor* _r; | |
73 | internal::linux_abi::aio_context_t _io_context; | |
74 | boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _submission_queue; | |
75 | iocb_pool _iocb_pool; | |
76 | size_t handle_aio_error(internal::linux_abi::iocb* iocb, int ec); | |
f67539c2 TL |
77 | using pending_aio_retry_t = boost::container::static_vector<internal::linux_abi::iocb*, max_aio>; |
78 | pending_aio_retry_t _pending_aio_retry; | |
79 | internal::linux_abi::io_event _ev_buffer[max_aio]; | |
9f95a23c TL |
80 | public: |
81 | explicit aio_storage_context(reactor* r); | |
82 | ~aio_storage_context(); | |
83 | ||
84 | bool reap_completions(); | |
f67539c2 | 85 | void schedule_retry(); |
9f95a23c TL |
86 | bool submit_work(); |
87 | bool can_sleep() const; | |
88 | }; | |
89 | ||
90 | class completion_with_iocb { | |
91 | bool _in_context = false; | |
92 | internal::linux_abi::iocb _iocb; | |
93 | protected: | |
94 | completion_with_iocb(int fd, int events, void* user_data); | |
95 | void completed() { | |
96 | _in_context = false; | |
97 | } | |
98 | public: | |
99 | void maybe_queue(aio_general_context& context); | |
100 | }; | |
101 | ||
102 | class fd_kernel_completion : public kernel_completion { | |
103 | protected: | |
104 | reactor* _r; | |
105 | file_desc& _fd; | |
106 | fd_kernel_completion(reactor* r, file_desc& fd) : _r(r), _fd(fd) {} | |
107 | public: | |
108 | file_desc& fd() { | |
109 | return _fd; | |
110 | } | |
111 | }; | |
112 | ||
113 | struct hrtimer_aio_completion : public fd_kernel_completion, | |
114 | public completion_with_iocb { | |
115 | hrtimer_aio_completion(reactor* r, file_desc& fd); | |
116 | virtual void complete_with(ssize_t value) override; | |
117 | }; | |
118 | ||
119 | struct task_quota_aio_completion : public fd_kernel_completion, | |
120 | public completion_with_iocb { | |
121 | task_quota_aio_completion(reactor* r, file_desc& fd); | |
122 | virtual void complete_with(ssize_t value) override; | |
123 | }; | |
124 | ||
125 | struct smp_wakeup_aio_completion : public fd_kernel_completion, | |
126 | public completion_with_iocb { | |
127 | smp_wakeup_aio_completion(reactor* r, file_desc& fd); | |
128 | virtual void complete_with(ssize_t value) override; | |
129 | }; | |
130 | ||
131 | // Common aio-based Implementation of the task quota and hrtimer. | |
132 | class preempt_io_context { | |
133 | reactor* _r; | |
134 | aio_general_context _context{2}; | |
135 | ||
136 | task_quota_aio_completion _task_quota_aio_completion; | |
137 | hrtimer_aio_completion _hrtimer_aio_completion; | |
138 | public: | |
139 | preempt_io_context(reactor* r, file_desc& task_quota, file_desc& hrtimer); | |
140 | bool service_preempting_io(); | |
141 | ||
142 | size_t flush() { | |
143 | return _context.flush(); | |
144 | } | |
145 | ||
146 | void reset_preemption_monitor(); | |
147 | void request_preemption(); | |
148 | void start_tick(); | |
149 | void stop_tick(); | |
150 | }; | |
151 | ||
152 | // The "reactor_backend" interface provides a method of waiting for various | |
153 | // basic events on one thread. We have one implementation based on epoll and | |
154 | // file-descriptors (reactor_backend_epoll) and one implementation based on | |
155 | // OSv-specific file-descriptor-less mechanisms (reactor_backend_osv). | |
156 | class reactor_backend { | |
157 | public: | |
158 | virtual ~reactor_backend() {}; | |
159 | // The methods below are used to communicate with the kernel. | |
160 | // reap_kernel_completions() will complete any previous async | |
161 | // work that is ready to consume. | |
162 | // kernel_submit_work() submit new events that were produced. | |
163 | // Both of those methods are asynchronous and will never block. | |
164 | // | |
165 | // wait_and_process_events on the other hand may block, and is called when | |
166 | // we are about to go to sleep. | |
167 | virtual bool reap_kernel_completions() = 0; | |
168 | virtual bool kernel_submit_work() = 0; | |
169 | virtual bool kernel_events_can_sleep() const = 0; | |
170 | virtual void wait_and_process_events(const sigset_t* active_sigmask = nullptr) = 0; | |
171 | ||
172 | // Methods that allow polling on file descriptors. This will only work on | |
173 | // reactor_backend_epoll. Other reactor_backend will probably abort if | |
174 | // they are called (which is fine if no file descriptors are waited on): | |
175 | virtual future<> readable(pollable_fd_state& fd) = 0; | |
176 | virtual future<> writeable(pollable_fd_state& fd) = 0; | |
177 | virtual future<> readable_or_writeable(pollable_fd_state& fd) = 0; | |
178 | virtual void forget(pollable_fd_state& fd) noexcept = 0; | |
179 | ||
180 | virtual future<std::tuple<pollable_fd, socket_address>> | |
181 | accept(pollable_fd_state& listenfd) = 0; | |
182 | virtual future<> connect(pollable_fd_state& fd, socket_address& sa) = 0; | |
183 | virtual void shutdown(pollable_fd_state& fd, int how) = 0; | |
184 | virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) = 0; | |
185 | virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) = 0; | |
f67539c2 | 186 | virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) = 0; |
9f95a23c TL |
187 | virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) = 0; |
188 | virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) = 0; | |
189 | ||
190 | virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) = 0; | |
191 | virtual void start_tick() = 0; | |
192 | virtual void stop_tick() = 0; | |
193 | virtual void arm_highres_timer(const ::itimerspec& ts) = 0; | |
194 | virtual void reset_preemption_monitor() = 0; | |
195 | virtual void request_preemption() = 0; | |
196 | virtual void start_handling_signal() = 0; | |
197 | ||
198 | virtual pollable_fd_state_ptr make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) = 0; | |
199 | }; | |
200 | ||
201 | // reactor backend using file-descriptor & epoll, suitable for running on | |
202 | // Linux. Can wait on multiple file descriptors, and converts other events | |
203 | // (such as timers, signals, inter-thread notifications) into file descriptors | |
204 | // using mechanisms like timerfd, signalfd and eventfd respectively. | |
205 | class reactor_backend_epoll : public reactor_backend { | |
206 | reactor* _r; | |
207 | std::thread _task_quota_timer_thread; | |
208 | timer_t _steady_clock_timer = {}; | |
209 | private: | |
210 | file_desc _epollfd; | |
211 | future<> get_epoll_future(pollable_fd_state& fd, int event); | |
212 | void complete_epoll_event(pollable_fd_state& fd, int events, int event); | |
213 | aio_storage_context _storage_context; | |
214 | bool wait_and_process(int timeout, const sigset_t* active_sigmask); | |
215 | bool _need_epoll_events = false; | |
216 | public: | |
217 | explicit reactor_backend_epoll(reactor* r); | |
218 | virtual ~reactor_backend_epoll() override; | |
219 | ||
220 | virtual bool reap_kernel_completions() override; | |
221 | virtual bool kernel_submit_work() override; | |
222 | virtual bool kernel_events_can_sleep() const override; | |
223 | virtual void wait_and_process_events(const sigset_t* active_sigmask) override; | |
224 | virtual future<> readable(pollable_fd_state& fd) override; | |
225 | virtual future<> writeable(pollable_fd_state& fd) override; | |
226 | virtual future<> readable_or_writeable(pollable_fd_state& fd) override; | |
227 | virtual void forget(pollable_fd_state& fd) noexcept override; | |
228 | ||
229 | virtual future<std::tuple<pollable_fd, socket_address>> | |
230 | accept(pollable_fd_state& listenfd) override; | |
231 | virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override; | |
232 | virtual void shutdown(pollable_fd_state& fd, int how) override; | |
233 | virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override; | |
234 | virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override; | |
f67539c2 | 235 | virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) override; |
9f95a23c TL |
236 | virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) override; |
237 | virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override; | |
238 | ||
239 | virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) override; | |
240 | virtual void start_tick() override; | |
241 | virtual void stop_tick() override; | |
242 | virtual void arm_highres_timer(const ::itimerspec& ts) override; | |
243 | virtual void reset_preemption_monitor() override; | |
244 | virtual void request_preemption() override; | |
245 | virtual void start_handling_signal() override; | |
246 | ||
247 | virtual pollable_fd_state_ptr | |
248 | make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override; | |
249 | }; | |
250 | ||
251 | class reactor_backend_aio : public reactor_backend { | |
252 | static constexpr size_t max_polls = 10000; | |
253 | reactor* _r; | |
254 | file_desc _hrtimer_timerfd; | |
255 | aio_storage_context _storage_context; | |
256 | // We use two aio contexts, one for preempting events (the timer tick and | |
257 | // signals), the other for non-preempting events (fd poll). | |
258 | preempt_io_context _preempting_io; // Used for the timer tick and the high resolution timer | |
259 | aio_general_context _polling_io{max_polls}; // FIXME: unify with disk aio_context | |
260 | hrtimer_aio_completion _hrtimer_poll_completion; | |
261 | smp_wakeup_aio_completion _smp_wakeup_aio_completion; | |
262 | static file_desc make_timerfd(); | |
263 | bool await_events(int timeout, const sigset_t* active_sigmask); | |
264 | public: | |
265 | explicit reactor_backend_aio(reactor* r); | |
266 | ||
267 | virtual bool reap_kernel_completions() override; | |
268 | virtual bool kernel_submit_work() override; | |
269 | virtual bool kernel_events_can_sleep() const override; | |
270 | virtual void wait_and_process_events(const sigset_t* active_sigmask) override; | |
271 | future<> poll(pollable_fd_state& fd, int events); | |
272 | virtual future<> readable(pollable_fd_state& fd) override; | |
273 | virtual future<> writeable(pollable_fd_state& fd) override; | |
274 | virtual future<> readable_or_writeable(pollable_fd_state& fd) override; | |
275 | virtual void forget(pollable_fd_state& fd) noexcept override; | |
276 | ||
277 | virtual future<std::tuple<pollable_fd, socket_address>> | |
278 | accept(pollable_fd_state& listenfd) override; | |
279 | virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override; | |
280 | virtual void shutdown(pollable_fd_state& fd, int how) override; | |
281 | virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override; | |
282 | virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override; | |
f67539c2 | 283 | virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) override; |
9f95a23c TL |
284 | virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) override; |
285 | virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override; | |
286 | ||
287 | virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) override; | |
288 | virtual void start_tick() override; | |
289 | virtual void stop_tick() override; | |
290 | virtual void arm_highres_timer(const ::itimerspec& its) override; | |
291 | virtual void reset_preemption_monitor() override; | |
292 | virtual void request_preemption() override; | |
293 | virtual void start_handling_signal() override; | |
294 | ||
295 | virtual pollable_fd_state_ptr | |
296 | make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override; | |
297 | }; | |
298 | ||
299 | #ifdef HAVE_OSV | |
300 | // reactor_backend using OSv-specific features, without any file descriptors. | |
301 | // This implementation cannot currently wait on file descriptors, but unlike | |
302 | // reactor_backend_epoll it doesn't need file descriptors for waiting on a | |
303 | // timer, for example, so file descriptors are not necessary. | |
304 | class reactor_backend_osv : public reactor_backend { | |
305 | private: | |
306 | osv::newpoll::poller _poller; | |
307 | future<> get_poller_future(reactor_notifier_osv *n); | |
308 | promise<> _timer_promise; | |
309 | public: | |
310 | reactor_backend_osv(); | |
311 | virtual ~reactor_backend_osv() override { } | |
312 | ||
313 | virtual bool reap_kernel_completions() override; | |
314 | virtual bool kernel_submit_work() override; | |
315 | virtual bool kernel_events_can_sleep() const override; | |
316 | virtual void wait_and_process_events(const sigset_t* active_sigmask) override; | |
317 | virtual future<> readable(pollable_fd_state& fd) override; | |
318 | virtual future<> writeable(pollable_fd_state& fd) override; | |
319 | virtual void forget(pollable_fd_state& fd) noexcept override; | |
320 | ||
321 | virtual future<std::tuple<pollable_fd, socket_address>> | |
322 | accept(pollable_fd_state& listenfd) override; | |
323 | virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override; | |
324 | virtual void shutdown(pollable_fd_state& fd, int how) override; | |
325 | virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override; | |
326 | virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override; | |
f67539c2 | 327 | virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) override; |
9f95a23c TL |
328 | virtual future<size_t> write_some(net::packet& p) override; |
329 | virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override; | |
330 | ||
331 | void enable_timer(steady_clock_type::time_point when); | |
332 | virtual pollable_fd_state_ptr | |
333 | make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override; | |
334 | }; | |
335 | #endif /* HAVE_OSV */ | |
336 | ||
337 | class reactor_backend_selector { | |
338 | std::string _name; | |
339 | private: | |
340 | static bool has_enough_aio_nr(); | |
341 | explicit reactor_backend_selector(std::string name) : _name(std::move(name)) {} | |
342 | public: | |
343 | std::unique_ptr<reactor_backend> create(reactor* r); | |
344 | static reactor_backend_selector default_backend(); | |
345 | static std::vector<reactor_backend_selector> available(); | |
346 | friend std::ostream& operator<<(std::ostream& os, const reactor_backend_selector& rbs) { | |
347 | return os << rbs._name; | |
348 | } | |
349 | friend void validate(boost::any& v, const std::vector<std::string> values, reactor_backend_selector* rbs, int) { | |
350 | namespace bpo = boost::program_options; | |
351 | bpo::validators::check_first_occurrence(v); | |
352 | auto s = bpo::validators::get_single_string(values); | |
353 | for (auto&& x : available()) { | |
354 | if (s == x._name) { | |
355 | v = std::move(x); | |
356 | return; | |
357 | } | |
358 | } | |
359 | throw bpo::validation_error(bpo::validation_error::invalid_option_value); | |
360 | } | |
361 | }; | |
362 | ||
363 | } |