2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright 2019 ScyllaDB
24 #include <seastar/core/future.hh>
25 #include <seastar/core/posix.hh>
26 #include <seastar/core/internal/pollable_fd.hh>
27 #include <seastar/core/internal/poll.hh>
28 #include <seastar/core/linux-aio.hh>
29 #include <seastar/core/cacheline.hh>
34 #include <boost/any.hpp>
35 #include <boost/program_options.hpp>
36 #include <boost/container/static_vector.hpp>
39 #include <osv/newpoll.hh>
46 // FIXME: merge it with storage context below. At this point the
47 // main thing to do is unify the iocb list
48 struct aio_general_context {
49 explicit aio_general_context(size_t nr);
50 ~aio_general_context();
51 internal::linux_abi::aio_context_t io_context{};
52 std::unique_ptr<internal::linux_abi::iocb*[]> iocbs;
53 internal::linux_abi::iocb** last = iocbs.get();
54 void queue(internal::linux_abi::iocb* iocb);
58 class aio_storage_context {
59 static constexpr unsigned max_aio = 1024;
62 alignas(cache_line_size) std::array<internal::linux_abi::iocb, max_aio> _iocb_pool;
63 std::stack<internal::linux_abi::iocb*, boost::container::static_vector<internal::linux_abi::iocb*, max_aio>> _free_iocbs;
66 internal::linux_abi::iocb& get_one();
67 void put_one(internal::linux_abi::iocb* io);
68 unsigned outstanding() const;
69 bool has_capacity() const;
73 internal::linux_abi::aio_context_t _io_context;
74 boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _submission_queue;
76 size_t handle_aio_error(internal::linux_abi::iocb* iocb, int ec);
77 using pending_aio_retry_t = boost::container::static_vector<internal::linux_abi::iocb*, max_aio>;
78 pending_aio_retry_t _pending_aio_retry;
79 internal::linux_abi::io_event _ev_buffer[max_aio];
81 explicit aio_storage_context(reactor* r);
82 ~aio_storage_context();
84 bool reap_completions();
85 void schedule_retry();
87 bool can_sleep() const;
90 class completion_with_iocb {
91 bool _in_context = false;
92 internal::linux_abi::iocb _iocb;
94 completion_with_iocb(int fd, int events, void* user_data);
99 void maybe_queue(aio_general_context& context);
102 class fd_kernel_completion : public kernel_completion {
106 fd_kernel_completion(reactor* r, file_desc& fd) : _r(r), _fd(fd) {}
113 struct hrtimer_aio_completion : public fd_kernel_completion,
114 public completion_with_iocb {
115 hrtimer_aio_completion(reactor* r, file_desc& fd);
116 virtual void complete_with(ssize_t value) override;
119 struct task_quota_aio_completion : public fd_kernel_completion,
120 public completion_with_iocb {
121 task_quota_aio_completion(reactor* r, file_desc& fd);
122 virtual void complete_with(ssize_t value) override;
125 struct smp_wakeup_aio_completion : public fd_kernel_completion,
126 public completion_with_iocb {
127 smp_wakeup_aio_completion(reactor* r, file_desc& fd);
128 virtual void complete_with(ssize_t value) override;
131 // Common aio-based Implementation of the task quota and hrtimer.
132 class preempt_io_context {
134 aio_general_context _context{2};
136 task_quota_aio_completion _task_quota_aio_completion;
137 hrtimer_aio_completion _hrtimer_aio_completion;
139 preempt_io_context(reactor* r, file_desc& task_quota, file_desc& hrtimer);
140 bool service_preempting_io();
143 return _context.flush();
146 void reset_preemption_monitor();
147 void request_preemption();
152 // The "reactor_backend" interface provides a method of waiting for various
153 // basic events on one thread. We have one implementation based on epoll and
154 // file-descriptors (reactor_backend_epoll) and one implementation based on
155 // OSv-specific file-descriptor-less mechanisms (reactor_backend_osv).
156 class reactor_backend {
158 virtual ~reactor_backend() {};
159 // The methods below are used to communicate with the kernel.
160 // reap_kernel_completions() will complete any previous async
161 // work that is ready to consume.
162 // kernel_submit_work() submit new events that were produced.
163 // Both of those methods are asynchronous and will never block.
165 // wait_and_process_events on the other hand may block, and is called when
166 // we are about to go to sleep.
167 virtual bool reap_kernel_completions() = 0;
168 virtual bool kernel_submit_work() = 0;
169 virtual bool kernel_events_can_sleep() const = 0;
170 virtual void wait_and_process_events(const sigset_t* active_sigmask = nullptr) = 0;
172 // Methods that allow polling on file descriptors. This will only work on
173 // reactor_backend_epoll. Other reactor_backend will probably abort if
174 // they are called (which is fine if no file descriptors are waited on):
175 virtual future<> readable(pollable_fd_state& fd) = 0;
176 virtual future<> writeable(pollable_fd_state& fd) = 0;
177 virtual future<> readable_or_writeable(pollable_fd_state& fd) = 0;
178 virtual void forget(pollable_fd_state& fd) noexcept = 0;
180 virtual future<std::tuple<pollable_fd, socket_address>>
181 accept(pollable_fd_state& listenfd) = 0;
182 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) = 0;
183 virtual void shutdown(pollable_fd_state& fd, int how) = 0;
184 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) = 0;
185 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) = 0;
186 virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) = 0;
187 virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) = 0;
188 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) = 0;
190 virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) = 0;
191 virtual void start_tick() = 0;
192 virtual void stop_tick() = 0;
193 virtual void arm_highres_timer(const ::itimerspec& ts) = 0;
194 virtual void reset_preemption_monitor() = 0;
195 virtual void request_preemption() = 0;
196 virtual void start_handling_signal() = 0;
198 virtual pollable_fd_state_ptr make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) = 0;
201 // reactor backend using file-descriptor & epoll, suitable for running on
202 // Linux. Can wait on multiple file descriptors, and converts other events
203 // (such as timers, signals, inter-thread notifications) into file descriptors
204 // using mechanisms like timerfd, signalfd and eventfd respectively.
205 class reactor_backend_epoll : public reactor_backend {
207 std::thread _task_quota_timer_thread;
208 timer_t _steady_clock_timer = {};
211 future<> get_epoll_future(pollable_fd_state& fd, int event);
212 void complete_epoll_event(pollable_fd_state& fd, int events, int event);
213 aio_storage_context _storage_context;
214 bool wait_and_process(int timeout, const sigset_t* active_sigmask);
215 bool _need_epoll_events = false;
217 explicit reactor_backend_epoll(reactor* r);
218 virtual ~reactor_backend_epoll() override;
220 virtual bool reap_kernel_completions() override;
221 virtual bool kernel_submit_work() override;
222 virtual bool kernel_events_can_sleep() const override;
223 virtual void wait_and_process_events(const sigset_t* active_sigmask) override;
224 virtual future<> readable(pollable_fd_state& fd) override;
225 virtual future<> writeable(pollable_fd_state& fd) override;
226 virtual future<> readable_or_writeable(pollable_fd_state& fd) override;
227 virtual void forget(pollable_fd_state& fd) noexcept override;
229 virtual future<std::tuple<pollable_fd, socket_address>>
230 accept(pollable_fd_state& listenfd) override;
231 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override;
232 virtual void shutdown(pollable_fd_state& fd, int how) override;
233 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override;
234 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override;
235 virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) override;
236 virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) override;
237 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override;
239 virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) override;
240 virtual void start_tick() override;
241 virtual void stop_tick() override;
242 virtual void arm_highres_timer(const ::itimerspec& ts) override;
243 virtual void reset_preemption_monitor() override;
244 virtual void request_preemption() override;
245 virtual void start_handling_signal() override;
247 virtual pollable_fd_state_ptr
248 make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override;
251 class reactor_backend_aio : public reactor_backend {
252 static constexpr size_t max_polls = 10000;
254 file_desc _hrtimer_timerfd;
255 aio_storage_context _storage_context;
256 // We use two aio contexts, one for preempting events (the timer tick and
257 // signals), the other for non-preempting events (fd poll).
258 preempt_io_context _preempting_io; // Used for the timer tick and the high resolution timer
259 aio_general_context _polling_io{max_polls}; // FIXME: unify with disk aio_context
260 hrtimer_aio_completion _hrtimer_poll_completion;
261 smp_wakeup_aio_completion _smp_wakeup_aio_completion;
262 static file_desc make_timerfd();
263 bool await_events(int timeout, const sigset_t* active_sigmask);
265 explicit reactor_backend_aio(reactor* r);
267 virtual bool reap_kernel_completions() override;
268 virtual bool kernel_submit_work() override;
269 virtual bool kernel_events_can_sleep() const override;
270 virtual void wait_and_process_events(const sigset_t* active_sigmask) override;
271 future<> poll(pollable_fd_state& fd, int events);
272 virtual future<> readable(pollable_fd_state& fd) override;
273 virtual future<> writeable(pollable_fd_state& fd) override;
274 virtual future<> readable_or_writeable(pollable_fd_state& fd) override;
275 virtual void forget(pollable_fd_state& fd) noexcept override;
277 virtual future<std::tuple<pollable_fd, socket_address>>
278 accept(pollable_fd_state& listenfd) override;
279 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override;
280 virtual void shutdown(pollable_fd_state& fd, int how) override;
281 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override;
282 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override;
283 virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) override;
284 virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) override;
285 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override;
287 virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) override;
288 virtual void start_tick() override;
289 virtual void stop_tick() override;
290 virtual void arm_highres_timer(const ::itimerspec& its) override;
291 virtual void reset_preemption_monitor() override;
292 virtual void request_preemption() override;
293 virtual void start_handling_signal() override;
295 virtual pollable_fd_state_ptr
296 make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override;
300 // reactor_backend using OSv-specific features, without any file descriptors.
301 // This implementation cannot currently wait on file descriptors, but unlike
302 // reactor_backend_epoll it doesn't need file descriptors for waiting on a
303 // timer, for example, so file descriptors are not necessary.
304 class reactor_backend_osv : public reactor_backend {
306 osv::newpoll::poller _poller;
307 future<> get_poller_future(reactor_notifier_osv *n);
308 promise<> _timer_promise;
310 reactor_backend_osv();
311 virtual ~reactor_backend_osv() override { }
313 virtual bool reap_kernel_completions() override;
314 virtual bool kernel_submit_work() override;
315 virtual bool kernel_events_can_sleep() const override;
316 virtual void wait_and_process_events(const sigset_t* active_sigmask) override;
317 virtual future<> readable(pollable_fd_state& fd) override;
318 virtual future<> writeable(pollable_fd_state& fd) override;
319 virtual void forget(pollable_fd_state& fd) noexcept override;
321 virtual future<std::tuple<pollable_fd, socket_address>>
322 accept(pollable_fd_state& listenfd) override;
323 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override;
324 virtual void shutdown(pollable_fd_state& fd, int how) override;
325 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override;
326 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override;
327 virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) override;
328 virtual future<size_t> write_some(net::packet& p) override;
329 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override;
331 void enable_timer(steady_clock_type::time_point when);
332 virtual pollable_fd_state_ptr
333 make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override;
335 #endif /* HAVE_OSV */
337 class reactor_backend_selector {
340 static bool has_enough_aio_nr();
341 explicit reactor_backend_selector(std::string name) : _name(std::move(name)) {}
343 std::unique_ptr<reactor_backend> create(reactor* r);
344 static reactor_backend_selector default_backend();
345 static std::vector<reactor_backend_selector> available();
346 friend std::ostream& operator<<(std::ostream& os, const reactor_backend_selector& rbs) {
347 return os << rbs._name;
349 friend void validate(boost::any& v, const std::vector<std::string> values, reactor_backend_selector* rbs, int) {
350 namespace bpo = boost::program_options;
351 bpo::validators::check_first_occurrence(v);
352 auto s = bpo::validators::get_single_string(values);
353 for (auto&& x : available()) {
359 throw bpo::validation_error(bpo::validation_error::invalid_option_value);