2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright 2019 ScyllaDB
24 #include <seastar/core/future.hh>
25 #include <seastar/core/posix.hh>
26 #include <seastar/core/internal/pollable_fd.hh>
27 #include <seastar/core/internal/poll.hh>
28 #include <seastar/core/linux-aio.hh>
29 #include <seastar/core/cacheline.hh>
34 #include <boost/any.hpp>
35 #include <boost/program_options.hpp>
36 #include <boost/container/static_vector.hpp>
39 #include <osv/newpoll.hh>
46 // FIXME: merge it with storage context below. At this point the
47 // main thing to do is unify the iocb list
48 struct aio_general_context {
49 explicit aio_general_context(size_t nr);
50 ~aio_general_context();
51 internal::linux_abi::aio_context_t io_context{};
52 std::unique_ptr<internal::linux_abi::iocb*[]> iocbs;
53 internal::linux_abi::iocb** last = iocbs.get();
54 void queue(internal::linux_abi::iocb* iocb);
58 class aio_storage_context {
59 static constexpr unsigned max_aio = 1024;
62 alignas(cache_line_size) std::array<internal::linux_abi::iocb, max_aio> _iocb_pool;
63 std::stack<internal::linux_abi::iocb*, boost::container::static_vector<internal::linux_abi::iocb*, max_aio>> _free_iocbs;
66 internal::linux_abi::iocb& get_one();
67 void put_one(internal::linux_abi::iocb* io);
68 unsigned outstanding() const;
69 bool has_capacity() const;
73 internal::linux_abi::aio_context_t _io_context;
74 boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _submission_queue;
76 size_t handle_aio_error(internal::linux_abi::iocb* iocb, int ec);
77 boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _pending_aio_retry;
79 explicit aio_storage_context(reactor* r);
80 ~aio_storage_context();
82 bool reap_completions();
84 bool can_sleep() const;
87 class completion_with_iocb {
88 bool _in_context = false;
89 internal::linux_abi::iocb _iocb;
91 completion_with_iocb(int fd, int events, void* user_data);
96 void maybe_queue(aio_general_context& context);
99 class fd_kernel_completion : public kernel_completion {
103 fd_kernel_completion(reactor* r, file_desc& fd) : _r(r), _fd(fd) {}
110 struct hrtimer_aio_completion : public fd_kernel_completion,
111 public completion_with_iocb {
112 hrtimer_aio_completion(reactor* r, file_desc& fd);
113 virtual void complete_with(ssize_t value) override;
116 struct task_quota_aio_completion : public fd_kernel_completion,
117 public completion_with_iocb {
118 task_quota_aio_completion(reactor* r, file_desc& fd);
119 virtual void complete_with(ssize_t value) override;
122 struct smp_wakeup_aio_completion : public fd_kernel_completion,
123 public completion_with_iocb {
124 smp_wakeup_aio_completion(reactor* r, file_desc& fd);
125 virtual void complete_with(ssize_t value) override;
128 // Common aio-based Implementation of the task quota and hrtimer.
129 class preempt_io_context {
131 aio_general_context _context{2};
133 task_quota_aio_completion _task_quota_aio_completion;
134 hrtimer_aio_completion _hrtimer_aio_completion;
136 preempt_io_context(reactor* r, file_desc& task_quota, file_desc& hrtimer);
137 bool service_preempting_io();
140 return _context.flush();
143 void reset_preemption_monitor();
144 void request_preemption();
149 // The "reactor_backend" interface provides a method of waiting for various
150 // basic events on one thread. We have one implementation based on epoll and
151 // file-descriptors (reactor_backend_epoll) and one implementation based on
152 // OSv-specific file-descriptor-less mechanisms (reactor_backend_osv).
153 class reactor_backend {
155 virtual ~reactor_backend() {};
156 // The methods below are used to communicate with the kernel.
157 // reap_kernel_completions() will complete any previous async
158 // work that is ready to consume.
159 // kernel_submit_work() submit new events that were produced.
160 // Both of those methods are asynchronous and will never block.
162 // wait_and_process_events on the other hand may block, and is called when
163 // we are about to go to sleep.
164 virtual bool reap_kernel_completions() = 0;
165 virtual bool kernel_submit_work() = 0;
166 virtual bool kernel_events_can_sleep() const = 0;
167 virtual void wait_and_process_events(const sigset_t* active_sigmask = nullptr) = 0;
169 // Methods that allow polling on file descriptors. This will only work on
170 // reactor_backend_epoll. Other reactor_backend will probably abort if
171 // they are called (which is fine if no file descriptors are waited on):
172 virtual future<> readable(pollable_fd_state& fd) = 0;
173 virtual future<> writeable(pollable_fd_state& fd) = 0;
174 virtual future<> readable_or_writeable(pollable_fd_state& fd) = 0;
175 virtual void forget(pollable_fd_state& fd) noexcept = 0;
177 virtual future<std::tuple<pollable_fd, socket_address>>
178 accept(pollable_fd_state& listenfd) = 0;
179 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) = 0;
180 virtual void shutdown(pollable_fd_state& fd, int how) = 0;
181 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) = 0;
182 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) = 0;
183 virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) = 0;
184 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) = 0;
186 virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) = 0;
187 virtual void start_tick() = 0;
188 virtual void stop_tick() = 0;
189 virtual void arm_highres_timer(const ::itimerspec& ts) = 0;
190 virtual void reset_preemption_monitor() = 0;
191 virtual void request_preemption() = 0;
192 virtual void start_handling_signal() = 0;
194 virtual pollable_fd_state_ptr make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) = 0;
197 // reactor backend using file-descriptor & epoll, suitable for running on
198 // Linux. Can wait on multiple file descriptors, and converts other events
199 // (such as timers, signals, inter-thread notifications) into file descriptors
200 // using mechanisms like timerfd, signalfd and eventfd respectively.
201 class reactor_backend_epoll : public reactor_backend {
203 std::thread _task_quota_timer_thread;
204 timer_t _steady_clock_timer = {};
207 future<> get_epoll_future(pollable_fd_state& fd, int event);
208 void complete_epoll_event(pollable_fd_state& fd, int events, int event);
209 aio_storage_context _storage_context;
210 bool wait_and_process(int timeout, const sigset_t* active_sigmask);
211 bool _need_epoll_events = false;
213 explicit reactor_backend_epoll(reactor* r);
214 virtual ~reactor_backend_epoll() override;
216 virtual bool reap_kernel_completions() override;
217 virtual bool kernel_submit_work() override;
218 virtual bool kernel_events_can_sleep() const override;
219 virtual void wait_and_process_events(const sigset_t* active_sigmask) override;
220 virtual future<> readable(pollable_fd_state& fd) override;
221 virtual future<> writeable(pollable_fd_state& fd) override;
222 virtual future<> readable_or_writeable(pollable_fd_state& fd) override;
223 virtual void forget(pollable_fd_state& fd) noexcept override;
225 virtual future<std::tuple<pollable_fd, socket_address>>
226 accept(pollable_fd_state& listenfd) override;
227 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override;
228 virtual void shutdown(pollable_fd_state& fd, int how) override;
229 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override;
230 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override;
231 virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) override;
232 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override;
234 virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) override;
235 virtual void start_tick() override;
236 virtual void stop_tick() override;
237 virtual void arm_highres_timer(const ::itimerspec& ts) override;
238 virtual void reset_preemption_monitor() override;
239 virtual void request_preemption() override;
240 virtual void start_handling_signal() override;
242 virtual pollable_fd_state_ptr
243 make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override;
246 class reactor_backend_aio : public reactor_backend {
247 static constexpr size_t max_polls = 10000;
249 file_desc _hrtimer_timerfd;
250 aio_storage_context _storage_context;
251 // We use two aio contexts, one for preempting events (the timer tick and
252 // signals), the other for non-preempting events (fd poll).
253 preempt_io_context _preempting_io; // Used for the timer tick and the high resolution timer
254 aio_general_context _polling_io{max_polls}; // FIXME: unify with disk aio_context
255 hrtimer_aio_completion _hrtimer_poll_completion;
256 smp_wakeup_aio_completion _smp_wakeup_aio_completion;
257 static file_desc make_timerfd();
258 bool await_events(int timeout, const sigset_t* active_sigmask);
260 explicit reactor_backend_aio(reactor* r);
262 virtual bool reap_kernel_completions() override;
263 virtual bool kernel_submit_work() override;
264 virtual bool kernel_events_can_sleep() const override;
265 virtual void wait_and_process_events(const sigset_t* active_sigmask) override;
266 future<> poll(pollable_fd_state& fd, int events);
267 virtual future<> readable(pollable_fd_state& fd) override;
268 virtual future<> writeable(pollable_fd_state& fd) override;
269 virtual future<> readable_or_writeable(pollable_fd_state& fd) override;
270 virtual void forget(pollable_fd_state& fd) noexcept override;
272 virtual future<std::tuple<pollable_fd, socket_address>>
273 accept(pollable_fd_state& listenfd) override;
274 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override;
275 virtual void shutdown(pollable_fd_state& fd, int how) override;
276 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override;
277 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override;
278 virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) override;
279 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override;
281 virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) override;
282 virtual void start_tick() override;
283 virtual void stop_tick() override;
284 virtual void arm_highres_timer(const ::itimerspec& its) override;
285 virtual void reset_preemption_monitor() override;
286 virtual void request_preemption() override;
287 virtual void start_handling_signal() override;
289 virtual pollable_fd_state_ptr
290 make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override;
294 // reactor_backend using OSv-specific features, without any file descriptors.
295 // This implementation cannot currently wait on file descriptors, but unlike
296 // reactor_backend_epoll it doesn't need file descriptors for waiting on a
297 // timer, for example, so file descriptors are not necessary.
298 class reactor_backend_osv : public reactor_backend {
300 osv::newpoll::poller _poller;
301 future<> get_poller_future(reactor_notifier_osv *n);
302 promise<> _timer_promise;
304 reactor_backend_osv();
305 virtual ~reactor_backend_osv() override { }
307 virtual bool reap_kernel_completions() override;
308 virtual bool kernel_submit_work() override;
309 virtual bool kernel_events_can_sleep() const override;
310 virtual void wait_and_process_events(const sigset_t* active_sigmask) override;
311 virtual future<> readable(pollable_fd_state& fd) override;
312 virtual future<> writeable(pollable_fd_state& fd) override;
313 virtual void forget(pollable_fd_state& fd) noexcept override;
315 virtual future<std::tuple<pollable_fd, socket_address>>
316 accept(pollable_fd_state& listenfd) override;
317 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override;
318 virtual void shutdown(pollable_fd_state& fd, int how) override;
319 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override;
320 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override;
321 virtual future<size_t> write_some(net::packet& p) override;
322 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override;
324 void enable_timer(steady_clock_type::time_point when);
325 virtual pollable_fd_state_ptr
326 make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override;
328 #endif /* HAVE_OSV */
330 class reactor_backend_selector {
333 static bool has_enough_aio_nr();
334 explicit reactor_backend_selector(std::string name) : _name(std::move(name)) {}
336 std::unique_ptr<reactor_backend> create(reactor* r);
337 static reactor_backend_selector default_backend();
338 static std::vector<reactor_backend_selector> available();
339 friend std::ostream& operator<<(std::ostream& os, const reactor_backend_selector& rbs) {
340 return os << rbs._name;
342 friend void validate(boost::any& v, const std::vector<std::string> values, reactor_backend_selector* rbs, int) {
343 namespace bpo = boost::program_options;
344 bpo::validators::check_first_occurrence(v);
345 auto s = bpo::validators::get_single_string(values);
346 for (auto&& x : available()) {
352 throw bpo::validation_error(bpo::validation_error::invalid_option_value);