]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright 2019 ScyllaDB | |
20 | */ | |
21 | ||
22 | #pragma once | |
23 | ||
24 | #include <seastar/core/future.hh> | |
25 | #include <seastar/core/posix.hh> | |
26 | #include <seastar/core/internal/pollable_fd.hh> | |
27 | #include <seastar/core/internal/poll.hh> | |
28 | #include <seastar/core/linux-aio.hh> | |
29 | #include <seastar/core/cacheline.hh> | |
30 | #include <sys/time.h> | |
31 | #include <signal.h> | |
32 | #include <thread> | |
33 | #include <stack> | |
34 | #include <boost/any.hpp> | |
35 | #include <boost/program_options.hpp> | |
36 | #include <boost/container/static_vector.hpp> | |
37 | ||
38 | #ifdef HAVE_OSV | |
39 | #include <osv/newpoll.hh> | |
40 | #endif | |
41 | ||
42 | namespace seastar { | |
43 | ||
44 | class reactor; | |
45 | ||
46 | // FIXME: merge it with storage context below. At this point the | |
47 | // main thing to do is unify the iocb list | |
48 | struct aio_general_context { | |
49 | explicit aio_general_context(size_t nr); | |
50 | ~aio_general_context(); | |
51 | internal::linux_abi::aio_context_t io_context{}; | |
52 | std::unique_ptr<internal::linux_abi::iocb*[]> iocbs; | |
20effc67 TL |
53 | internal::linux_abi::iocb** last; |
54 | internal::linux_abi::iocb** const end; | |
9f95a23c | 55 | void queue(internal::linux_abi::iocb* iocb); |
20effc67 | 56 | // submit all queued iocbs and return their count. |
9f95a23c TL |
57 | size_t flush(); |
58 | }; | |
59 | ||
60 | class aio_storage_context { | |
61 | static constexpr unsigned max_aio = 1024; | |
62 | ||
63 | class iocb_pool { | |
64 | alignas(cache_line_size) std::array<internal::linux_abi::iocb, max_aio> _iocb_pool; | |
65 | std::stack<internal::linux_abi::iocb*, boost::container::static_vector<internal::linux_abi::iocb*, max_aio>> _free_iocbs; | |
66 | public: | |
67 | iocb_pool(); | |
68 | internal::linux_abi::iocb& get_one(); | |
69 | void put_one(internal::linux_abi::iocb* io); | |
70 | unsigned outstanding() const; | |
71 | bool has_capacity() const; | |
72 | }; | |
73 | ||
20effc67 | 74 | reactor& _r; |
9f95a23c TL |
75 | internal::linux_abi::aio_context_t _io_context; |
76 | boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _submission_queue; | |
77 | iocb_pool _iocb_pool; | |
78 | size_t handle_aio_error(internal::linux_abi::iocb* iocb, int ec); | |
f67539c2 | 79 | using pending_aio_retry_t = boost::container::static_vector<internal::linux_abi::iocb*, max_aio>; |
20effc67 TL |
80 | pending_aio_retry_t _pending_aio_retry; // Pending retries iocbs |
81 | pending_aio_retry_t _aio_retries; // Currently retried iocbs | |
82 | future<> _pending_aio_retry_fut = make_ready_future<>(); | |
f67539c2 | 83 | internal::linux_abi::io_event _ev_buffer[max_aio]; |
20effc67 TL |
84 | |
85 | bool need_to_retry() const noexcept { | |
86 | return !_pending_aio_retry.empty() || !_aio_retries.empty(); | |
87 | } | |
88 | ||
89 | bool retry_in_progress() const noexcept { | |
90 | return !_pending_aio_retry_fut.available(); | |
91 | } | |
92 | ||
9f95a23c | 93 | public: |
20effc67 | 94 | explicit aio_storage_context(reactor& r); |
9f95a23c TL |
95 | ~aio_storage_context(); |
96 | ||
20effc67 | 97 | bool reap_completions(bool allow_retry = true); |
f67539c2 | 98 | void schedule_retry(); |
9f95a23c TL |
99 | bool submit_work(); |
100 | bool can_sleep() const; | |
20effc67 | 101 | future<> stop() noexcept; |
9f95a23c TL |
102 | }; |
103 | ||
104 | class completion_with_iocb { | |
105 | bool _in_context = false; | |
106 | internal::linux_abi::iocb _iocb; | |
107 | protected: | |
108 | completion_with_iocb(int fd, int events, void* user_data); | |
109 | void completed() { | |
110 | _in_context = false; | |
111 | } | |
112 | public: | |
113 | void maybe_queue(aio_general_context& context); | |
114 | }; | |
115 | ||
116 | class fd_kernel_completion : public kernel_completion { | |
117 | protected: | |
9f95a23c | 118 | file_desc& _fd; |
20effc67 | 119 | fd_kernel_completion(file_desc& fd) : _fd(fd) {} |
9f95a23c TL |
120 | public: |
121 | file_desc& fd() { | |
122 | return _fd; | |
123 | } | |
124 | }; | |
125 | ||
126 | struct hrtimer_aio_completion : public fd_kernel_completion, | |
127 | public completion_with_iocb { | |
20effc67 TL |
128 | private: |
129 | reactor& _r; | |
130 | public: | |
131 | hrtimer_aio_completion(reactor& r, file_desc& fd); | |
9f95a23c TL |
132 | virtual void complete_with(ssize_t value) override; |
133 | }; | |
134 | ||
135 | struct task_quota_aio_completion : public fd_kernel_completion, | |
136 | public completion_with_iocb { | |
20effc67 | 137 | task_quota_aio_completion(file_desc& fd); |
9f95a23c TL |
138 | virtual void complete_with(ssize_t value) override; |
139 | }; | |
140 | ||
141 | struct smp_wakeup_aio_completion : public fd_kernel_completion, | |
142 | public completion_with_iocb { | |
20effc67 | 143 | smp_wakeup_aio_completion(file_desc& fd); |
9f95a23c TL |
144 | virtual void complete_with(ssize_t value) override; |
145 | }; | |
146 | ||
147 | // Common aio-based Implementation of the task quota and hrtimer. | |
148 | class preempt_io_context { | |
20effc67 | 149 | reactor& _r; |
9f95a23c TL |
150 | aio_general_context _context{2}; |
151 | ||
152 | task_quota_aio_completion _task_quota_aio_completion; | |
153 | hrtimer_aio_completion _hrtimer_aio_completion; | |
154 | public: | |
20effc67 | 155 | preempt_io_context(reactor& r, file_desc& task_quota, file_desc& hrtimer); |
9f95a23c TL |
156 | bool service_preempting_io(); |
157 | ||
158 | size_t flush() { | |
159 | return _context.flush(); | |
160 | } | |
161 | ||
162 | void reset_preemption_monitor(); | |
163 | void request_preemption(); | |
164 | void start_tick(); | |
165 | void stop_tick(); | |
166 | }; | |
167 | ||
168 | // The "reactor_backend" interface provides a method of waiting for various | |
169 | // basic events on one thread. We have one implementation based on epoll and | |
170 | // file-descriptors (reactor_backend_epoll) and one implementation based on | |
171 | // OSv-specific file-descriptor-less mechanisms (reactor_backend_osv). | |
172 | class reactor_backend { | |
173 | public: | |
174 | virtual ~reactor_backend() {}; | |
175 | // The methods below are used to communicate with the kernel. | |
176 | // reap_kernel_completions() will complete any previous async | |
177 | // work that is ready to consume. | |
178 | // kernel_submit_work() submit new events that were produced. | |
179 | // Both of those methods are asynchronous and will never block. | |
180 | // | |
181 | // wait_and_process_events on the other hand may block, and is called when | |
182 | // we are about to go to sleep. | |
183 | virtual bool reap_kernel_completions() = 0; | |
184 | virtual bool kernel_submit_work() = 0; | |
185 | virtual bool kernel_events_can_sleep() const = 0; | |
186 | virtual void wait_and_process_events(const sigset_t* active_sigmask = nullptr) = 0; | |
187 | ||
188 | // Methods that allow polling on file descriptors. This will only work on | |
189 | // reactor_backend_epoll. Other reactor_backend will probably abort if | |
190 | // they are called (which is fine if no file descriptors are waited on): | |
191 | virtual future<> readable(pollable_fd_state& fd) = 0; | |
192 | virtual future<> writeable(pollable_fd_state& fd) = 0; | |
193 | virtual future<> readable_or_writeable(pollable_fd_state& fd) = 0; | |
194 | virtual void forget(pollable_fd_state& fd) noexcept = 0; | |
195 | ||
196 | virtual future<std::tuple<pollable_fd, socket_address>> | |
197 | accept(pollable_fd_state& listenfd) = 0; | |
198 | virtual future<> connect(pollable_fd_state& fd, socket_address& sa) = 0; | |
199 | virtual void shutdown(pollable_fd_state& fd, int how) = 0; | |
200 | virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) = 0; | |
201 | virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) = 0; | |
f67539c2 | 202 | virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) = 0; |
9f95a23c TL |
203 | virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) = 0; |
204 | virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) = 0; | |
205 | ||
206 | virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) = 0; | |
207 | virtual void start_tick() = 0; | |
208 | virtual void stop_tick() = 0; | |
209 | virtual void arm_highres_timer(const ::itimerspec& ts) = 0; | |
210 | virtual void reset_preemption_monitor() = 0; | |
211 | virtual void request_preemption() = 0; | |
212 | virtual void start_handling_signal() = 0; | |
213 | ||
214 | virtual pollable_fd_state_ptr make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) = 0; | |
215 | }; | |
216 | ||
217 | // reactor backend using file-descriptor & epoll, suitable for running on | |
218 | // Linux. Can wait on multiple file descriptors, and converts other events | |
219 | // (such as timers, signals, inter-thread notifications) into file descriptors | |
220 | // using mechanisms like timerfd, signalfd and eventfd respectively. | |
221 | class reactor_backend_epoll : public reactor_backend { | |
20effc67 TL |
222 | reactor& _r; |
223 | std::atomic<bool> _highres_timer_pending; | |
9f95a23c | 224 | std::thread _task_quota_timer_thread; |
20effc67 TL |
225 | ::itimerspec _steady_clock_timer_deadline = {}; |
226 | // These two timers are used for high resolution timer<>s, one for | |
227 | // the reactor thread (when sleeping) and one for the timer thread | |
228 | // (when awake). We can't use one timer because of races between the | |
229 | // timer thread and reactor thread. | |
230 | // | |
231 | // Only one of the two is active at any time. | |
232 | file_desc _steady_clock_timer_reactor_thread; | |
233 | file_desc _steady_clock_timer_timer_thread; | |
9f95a23c TL |
234 | private: |
235 | file_desc _epollfd; | |
20effc67 | 236 | void task_quota_timer_thread_fn(); |
9f95a23c TL |
237 | future<> get_epoll_future(pollable_fd_state& fd, int event); |
238 | void complete_epoll_event(pollable_fd_state& fd, int events, int event); | |
239 | aio_storage_context _storage_context; | |
20effc67 TL |
240 | void switch_steady_clock_timers(file_desc& from, file_desc& to); |
241 | void maybe_switch_steady_clock_timers(int timeout, file_desc& from, file_desc& to); | |
9f95a23c | 242 | bool wait_and_process(int timeout, const sigset_t* active_sigmask); |
20effc67 | 243 | bool complete_hrtimer(); |
9f95a23c TL |
244 | bool _need_epoll_events = false; |
245 | public: | |
20effc67 | 246 | explicit reactor_backend_epoll(reactor& r); |
9f95a23c TL |
247 | virtual ~reactor_backend_epoll() override; |
248 | ||
249 | virtual bool reap_kernel_completions() override; | |
250 | virtual bool kernel_submit_work() override; | |
251 | virtual bool kernel_events_can_sleep() const override; | |
252 | virtual void wait_and_process_events(const sigset_t* active_sigmask) override; | |
253 | virtual future<> readable(pollable_fd_state& fd) override; | |
254 | virtual future<> writeable(pollable_fd_state& fd) override; | |
255 | virtual future<> readable_or_writeable(pollable_fd_state& fd) override; | |
256 | virtual void forget(pollable_fd_state& fd) noexcept override; | |
257 | ||
258 | virtual future<std::tuple<pollable_fd, socket_address>> | |
259 | accept(pollable_fd_state& listenfd) override; | |
260 | virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override; | |
261 | virtual void shutdown(pollable_fd_state& fd, int how) override; | |
262 | virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override; | |
263 | virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override; | |
f67539c2 | 264 | virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) override; |
9f95a23c TL |
265 | virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) override; |
266 | virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override; | |
267 | ||
268 | virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) override; | |
269 | virtual void start_tick() override; | |
270 | virtual void stop_tick() override; | |
271 | virtual void arm_highres_timer(const ::itimerspec& ts) override; | |
272 | virtual void reset_preemption_monitor() override; | |
273 | virtual void request_preemption() override; | |
274 | virtual void start_handling_signal() override; | |
275 | ||
276 | virtual pollable_fd_state_ptr | |
277 | make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override; | |
278 | }; | |
279 | ||
280 | class reactor_backend_aio : public reactor_backend { | |
20effc67 TL |
281 | reactor& _r; |
282 | unsigned max_polls() const; | |
9f95a23c TL |
283 | file_desc _hrtimer_timerfd; |
284 | aio_storage_context _storage_context; | |
285 | // We use two aio contexts, one for preempting events (the timer tick and | |
286 | // signals), the other for non-preempting events (fd poll). | |
287 | preempt_io_context _preempting_io; // Used for the timer tick and the high resolution timer | |
20effc67 | 288 | aio_general_context _polling_io{max_polls()}; // FIXME: unify with disk aio_context |
9f95a23c TL |
289 | hrtimer_aio_completion _hrtimer_poll_completion; |
290 | smp_wakeup_aio_completion _smp_wakeup_aio_completion; | |
291 | static file_desc make_timerfd(); | |
292 | bool await_events(int timeout, const sigset_t* active_sigmask); | |
293 | public: | |
20effc67 | 294 | explicit reactor_backend_aio(reactor& r); |
9f95a23c TL |
295 | |
296 | virtual bool reap_kernel_completions() override; | |
297 | virtual bool kernel_submit_work() override; | |
298 | virtual bool kernel_events_can_sleep() const override; | |
299 | virtual void wait_and_process_events(const sigset_t* active_sigmask) override; | |
300 | future<> poll(pollable_fd_state& fd, int events); | |
301 | virtual future<> readable(pollable_fd_state& fd) override; | |
302 | virtual future<> writeable(pollable_fd_state& fd) override; | |
303 | virtual future<> readable_or_writeable(pollable_fd_state& fd) override; | |
304 | virtual void forget(pollable_fd_state& fd) noexcept override; | |
305 | ||
306 | virtual future<std::tuple<pollable_fd, socket_address>> | |
307 | accept(pollable_fd_state& listenfd) override; | |
308 | virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override; | |
309 | virtual void shutdown(pollable_fd_state& fd, int how) override; | |
310 | virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override; | |
311 | virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override; | |
f67539c2 | 312 | virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) override; |
9f95a23c TL |
313 | virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) override; |
314 | virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override; | |
315 | ||
316 | virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) override; | |
317 | virtual void start_tick() override; | |
318 | virtual void stop_tick() override; | |
319 | virtual void arm_highres_timer(const ::itimerspec& its) override; | |
320 | virtual void reset_preemption_monitor() override; | |
321 | virtual void request_preemption() override; | |
322 | virtual void start_handling_signal() override; | |
323 | ||
324 | virtual pollable_fd_state_ptr | |
325 | make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override; | |
326 | }; | |
327 | ||
328 | #ifdef HAVE_OSV | |
329 | // reactor_backend using OSv-specific features, without any file descriptors. | |
330 | // This implementation cannot currently wait on file descriptors, but unlike | |
331 | // reactor_backend_epoll it doesn't need file descriptors for waiting on a | |
332 | // timer, for example, so file descriptors are not necessary. | |
333 | class reactor_backend_osv : public reactor_backend { | |
334 | private: | |
335 | osv::newpoll::poller _poller; | |
336 | future<> get_poller_future(reactor_notifier_osv *n); | |
337 | promise<> _timer_promise; | |
338 | public: | |
339 | reactor_backend_osv(); | |
340 | virtual ~reactor_backend_osv() override { } | |
341 | ||
342 | virtual bool reap_kernel_completions() override; | |
343 | virtual bool kernel_submit_work() override; | |
344 | virtual bool kernel_events_can_sleep() const override; | |
345 | virtual void wait_and_process_events(const sigset_t* active_sigmask) override; | |
346 | virtual future<> readable(pollable_fd_state& fd) override; | |
347 | virtual future<> writeable(pollable_fd_state& fd) override; | |
348 | virtual void forget(pollable_fd_state& fd) noexcept override; | |
349 | ||
350 | virtual future<std::tuple<pollable_fd, socket_address>> | |
351 | accept(pollable_fd_state& listenfd) override; | |
352 | virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override; | |
353 | virtual void shutdown(pollable_fd_state& fd, int how) override; | |
354 | virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override; | |
355 | virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override; | |
f67539c2 | 356 | virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) override; |
9f95a23c TL |
357 | virtual future<size_t> write_some(net::packet& p) override; |
358 | virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override; | |
359 | ||
360 | void enable_timer(steady_clock_type::time_point when); | |
361 | virtual pollable_fd_state_ptr | |
362 | make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override; | |
363 | }; | |
364 | #endif /* HAVE_OSV */ | |
365 | ||
366 | class reactor_backend_selector { | |
367 | std::string _name; | |
368 | private: | |
369 | static bool has_enough_aio_nr(); | |
370 | explicit reactor_backend_selector(std::string name) : _name(std::move(name)) {} | |
371 | public: | |
20effc67 TL |
372 | const std::string& name() const { return _name; } |
373 | std::unique_ptr<reactor_backend> create(reactor& r); | |
9f95a23c TL |
374 | static reactor_backend_selector default_backend(); |
375 | static std::vector<reactor_backend_selector> available(); | |
376 | friend std::ostream& operator<<(std::ostream& os, const reactor_backend_selector& rbs) { | |
377 | return os << rbs._name; | |
378 | } | |
9f95a23c TL |
379 | }; |
380 | ||
381 | } |