]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/include/seastar/core/reactor.hh
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / include / seastar / core / reactor.hh
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright 2014 Cloudius Systems
20 */
21
22 #pragma once
23
24 #include <seastar/core/seastar.hh>
25 #include <seastar/core/iostream.hh>
26 #include <seastar/core/aligned_buffer.hh>
27 #include <seastar/core/cacheline.hh>
28 #include <seastar/core/circular_buffer_fixed_capacity.hh>
29 #include <seastar/core/idle_cpu_handler.hh>
30 #include <memory>
31 #include <type_traits>
32 #include <sys/epoll.h>
33 #include <sys/types.h>
34 #include <sys/socket.h>
35 #include <unordered_map>
36 #include <netinet/ip.h>
37 #include <cstring>
38 #include <cassert>
39 #include <stdexcept>
40 #include <unistd.h>
41 #include <vector>
42 #include <queue>
43 #include <algorithm>
44 #include <thread>
45 #include <system_error>
46 #include <chrono>
47 #include <ratio>
48 #include <atomic>
49 #include <stack>
50 #include <seastar/util/std-compat.hh>
51 #include <boost/next_prior.hpp>
52 #include <boost/lockfree/spsc_queue.hpp>
53 #include <boost/thread/barrier.hpp>
54 #include <boost/container/static_vector.hpp>
55 #include <set>
56 #include <seastar/core/reactor_config.hh>
57 #include <seastar/core/linux-aio.hh>
58 #include <seastar/util/eclipse.hh>
59 #include <seastar/core/future.hh>
60 #include <seastar/core/posix.hh>
61 #include <seastar/core/sstring.hh>
62 #include <seastar/net/api.hh>
63 #include <seastar/core/temporary_buffer.hh>
64 #include <seastar/core/circular_buffer.hh>
65 #include <seastar/core/file.hh>
66 #include <seastar/core/semaphore.hh>
67 #include <seastar/core/fair_queue.hh>
68 #include <seastar/core/scattered_message.hh>
69 #include <seastar/core/enum.hh>
70 #include <seastar/core/memory.hh>
71 #include <seastar/core/thread_cputime_clock.hh>
72 #include <boost/range/irange.hpp>
73 #include <seastar/core/timer.hh>
74 #include <seastar/core/condition-variable.hh>
75 #include <seastar/util/log.hh>
76 #include <seastar/core/lowres_clock.hh>
77 #include <seastar/core/manual_clock.hh>
78 #include <seastar/core/metrics_registration.hh>
79 #include <seastar/core/scheduling.hh>
80 #include <seastar/core/scheduling_specific.hh>
81 #include <seastar/core/smp.hh>
82 #include <seastar/core/internal/io_request.hh>
83 #include <seastar/core/internal/io_sink.hh>
84 #include <seastar/core/make_task.hh>
85 #include "internal/pollable_fd.hh"
86 #include "internal/poll.hh"
87
88 #ifdef HAVE_OSV
89 #include <osv/sched.hh>
90 #include <osv/mutex.h>
91 #include <osv/condvar.h>
92 #include <osv/newpoll.hh>
93 #endif
94
95 struct _Unwind_Exception;
96
97 namespace seastar {
98
99 using shard_id = unsigned;
100
101 namespace alien {
102 class message_queue;
103 class instance;
104 }
105 class reactor;
106
107 }
108
109 namespace std {
110
111 template <>
112 struct hash<::sockaddr_in> {
113 size_t operator()(::sockaddr_in a) const {
114 return a.sin_port ^ a.sin_addr.s_addr;
115 }
116 };
117
118 }
119
120 bool operator==(const ::sockaddr_in a, const ::sockaddr_in b);
121
122 namespace seastar {
123
124 class thread_pool;
125 class smp;
126
127 class reactor_backend_selector;
128
129 class reactor_backend;
130
131 namespace internal {
132
133 class reactor_stall_sampler;
134 class cpu_stall_detector;
135 class buffer_allocator;
136
137 template <typename Func> // signature: bool ()
138 std::unique_ptr<pollfn> make_pollfn(Func&& func);
139
140 class poller {
141 std::unique_ptr<pollfn> _pollfn;
142 class registration_task;
143 class deregistration_task;
144 registration_task* _registration_task = nullptr;
145 public:
146 template <typename Func> // signature: bool ()
147 static poller simple(Func&& poll) {
148 return poller(make_pollfn(std::forward<Func>(poll)));
149 }
150 poller(std::unique_ptr<pollfn> fn)
151 : _pollfn(std::move(fn)) {
152 do_register();
153 }
154 ~poller();
155 poller(poller&& x) noexcept;
156 poller& operator=(poller&& x) noexcept;
157 void do_register() noexcept;
158 friend class reactor;
159 };
160
161 size_t scheduling_group_count();
162
163 void increase_thrown_exceptions_counter() noexcept;
164
165 }
166
167 class kernel_completion;
168 class io_queue;
169 class io_intent;
170 class disk_config_params;
171
172 class io_completion : public kernel_completion {
173 public:
174 virtual void complete_with(ssize_t res) final override;
175
176 virtual void complete(size_t res) noexcept = 0;
177 virtual void set_exception(std::exception_ptr eptr) noexcept = 0;
178 };
179
180 class reactor {
181 private:
182 struct task_queue;
183 using task_queue_list = circular_buffer_fixed_capacity<task_queue*, 1 << log2ceil(max_scheduling_groups())>;
184 using pollfn = seastar::pollfn;
185
186 class signal_pollfn;
187 class batch_flush_pollfn;
188 class smp_pollfn;
189 class drain_cross_cpu_freelist_pollfn;
190 class lowres_timer_pollfn;
191 class manual_timer_pollfn;
192 class epoll_pollfn;
193 class reap_kernel_completions_pollfn;
194 class kernel_submit_work_pollfn;
195 class io_queue_submission_pollfn;
196 class syscall_pollfn;
197 class execution_stage_pollfn;
198 friend class manual_clock;
199 friend class file_data_source_impl; // for fstream statistics
200 friend class internal::reactor_stall_sampler;
201 friend class preempt_io_context;
202 friend struct hrtimer_aio_completion;
203 friend class reactor_backend_epoll;
204 friend class reactor_backend_aio;
205 friend class reactor_backend_uring;
206 friend class reactor_backend_selector;
207 friend class io_queue; // for aio statistics
208 friend struct reactor_options;
209 friend class aio_storage_context;
210 public:
211 using poller = internal::poller;
212 using idle_cpu_handler_result = seastar::idle_cpu_handler_result;
213 using work_waiting_on_reactor = seastar::work_waiting_on_reactor;
214 using idle_cpu_handler = seastar::idle_cpu_handler;
215
216 struct io_stats {
217 uint64_t aio_reads = 0;
218 uint64_t aio_read_bytes = 0;
219 uint64_t aio_writes = 0;
220 uint64_t aio_write_bytes = 0;
221 uint64_t aio_outsizes = 0;
222 uint64_t aio_errors = 0;
223 uint64_t fstream_reads = 0;
224 uint64_t fstream_read_bytes = 0;
225 uint64_t fstream_reads_blocked = 0;
226 uint64_t fstream_read_bytes_blocked = 0;
227 uint64_t fstream_read_aheads_discarded = 0;
228 uint64_t fstream_read_ahead_discarded_bytes = 0;
229 };
230 /// Scheduling statistics.
231 struct sched_stats {
232 /// Total number of tasks processed by this shard's reactor until this point.
233 /// Note that tasks can be tiny, running for a few nanoseconds, or can take an
234 /// entire task quota.
235 uint64_t tasks_processed = 0;
236 };
237 friend void io_completion::complete_with(ssize_t);
238
239 /// Obtains an alien::instance object that can be used to send messages
240 /// to Seastar shards from non-Seastar threads.
241 alien::instance& alien() { return _alien; }
242
243 private:
244 std::shared_ptr<smp> _smp;
245 alien::instance& _alien;
246 reactor_config _cfg;
247 file_desc _notify_eventfd;
248 file_desc _task_quota_timer;
249 #ifdef HAVE_OSV
250 reactor_backend_osv _backend;
251 sched::thread _timer_thread;
252 sched::thread *_engine_thread;
253 mutable mutex _timer_mutex;
254 condvar _timer_cond;
255 s64 _timer_due = 0;
256 #else
257 std::unique_ptr<reactor_backend> _backend;
258 #endif
259 sigset_t _active_sigmask; // holds sigmask while sleeping with sig disabled
260 std::vector<pollfn*> _pollers;
261
262 static constexpr unsigned max_aio_per_queue = 128;
263 static constexpr unsigned max_queues = 8;
264 static constexpr unsigned max_aio = max_aio_per_queue * max_queues;
265 friend disk_config_params;
266
267 // Each mountpouint is controlled by its own io_queue, but ...
268 std::unordered_map<dev_t, std::unique_ptr<io_queue>> _io_queues;
269 // ... when dispatched all requests get into this single sink
270 internal::io_sink _io_sink;
271 unsigned _num_io_groups = 0;
272
273 std::vector<noncopyable_function<future<> ()>> _exit_funcs;
274 unsigned _id = 0;
275 bool _stopping = false;
276 bool _stopped = false;
277 bool _finished_running_tasks = false;
278 condition_variable _stop_requested;
279 bool _handle_sigint = true;
280 std::optional<future<std::unique_ptr<network_stack>>> _network_stack_ready;
281 int _return = 0;
282 promise<> _start_promise;
283 semaphore _cpu_started;
284 internal::preemption_monitor _preemption_monitor{};
285 uint64_t _global_tasks_processed = 0;
286 uint64_t _polls = 0;
287 std::unique_ptr<internal::cpu_stall_detector> _cpu_stall_detector;
288
289 unsigned _max_task_backlog = 1000;
290 timer_set<timer<>, &timer<>::_link> _timers;
291 timer_set<timer<>, &timer<>::_link>::timer_list_t _expired_timers;
292 timer_set<timer<lowres_clock>, &timer<lowres_clock>::_link> _lowres_timers;
293 timer_set<timer<lowres_clock>, &timer<lowres_clock>::_link>::timer_list_t _expired_lowres_timers;
294 timer_set<timer<manual_clock>, &timer<manual_clock>::_link> _manual_timers;
295 timer_set<timer<manual_clock>, &timer<manual_clock>::_link>::timer_list_t _expired_manual_timers;
296 io_stats _io_stats;
297 uint64_t _fsyncs = 0;
298 uint64_t _cxx_exceptions = 0;
299 uint64_t _abandoned_failed_futures = 0;
300 struct task_queue {
301 explicit task_queue(unsigned id, sstring name, float shares);
302 int64_t _vruntime = 0;
303 float _shares;
304 int64_t _reciprocal_shares_times_2_power_32;
305 bool _current = false;
306 bool _active = false;
307 uint8_t _id;
308 sched_clock::time_point _ts; // to help calculating wait/starve-times
309 sched_clock::duration _runtime = {};
310 sched_clock::duration _waittime = {};
311 sched_clock::duration _starvetime = {};
312 uint64_t _tasks_processed = 0;
313 circular_buffer<task*> _q;
314 sstring _name;
315 int64_t to_vruntime(sched_clock::duration runtime) const;
316 void set_shares(float shares) noexcept;
317 struct indirect_compare;
318 sched_clock::duration _time_spent_on_task_quota_violations = {};
319 seastar::metrics::metric_groups _metrics;
320 void rename(sstring new_name);
321 private:
322 void register_stats();
323 };
324
325 boost::container::static_vector<std::unique_ptr<task_queue>, max_scheduling_groups()> _task_queues;
326 internal::scheduling_group_specific_thread_local_data _scheduling_group_specific_data;
327 int64_t _last_vruntime = 0;
328 task_queue_list _active_task_queues;
329 task_queue_list _activating_task_queues;
330 task_queue* _at_destroy_tasks;
331 sched_clock::duration _task_quota;
332 task* _current_task = nullptr;
333 /// Handler that will be called when there is no task to execute on cpu.
334 /// It represents a low priority work.
335 ///
336 /// Handler's return value determines whether handler did any actual work. If no work was done then reactor will go
337 /// into sleep.
338 ///
339 /// Handler's argument is a function that returns true if a task which should be executed on cpu appears or false
340 /// otherwise. This function should be used by a handler to return early if a task appears.
341 idle_cpu_handler _idle_cpu_handler{ [] (work_waiting_on_reactor) {return idle_cpu_handler_result::no_more_work;} };
342 std::unique_ptr<network_stack> _network_stack;
343 lowres_clock::time_point _lowres_next_timeout = lowres_clock::time_point::max();
344 std::optional<pollable_fd> _aio_eventfd;
345 const bool _reuseport;
346 circular_buffer<double> _loads;
347 double _load = 0;
348 sched_clock::duration _total_idle{0};
349 sched_clock::duration _total_sleep;
350 sched_clock::time_point _start_time = now();
351 std::chrono::nanoseconds _max_poll_time = calculate_poll_time();
352 output_stream<char>::batch_flush_list_t _flush_batching;
353 std::atomic<bool> _sleeping alignas(seastar::cache_line_size){0};
354 pthread_t _thread_id alignas(seastar::cache_line_size) = pthread_self();
355 bool _strict_o_direct = true;
356 bool _force_io_getevents_syscall = false;
357 bool _bypass_fsync = false;
358 bool _have_aio_fsync = false;
359 bool _kernel_page_cache = false;
360 std::atomic<bool> _dying{false};
361 private:
362 static std::chrono::nanoseconds calculate_poll_time();
363 static void block_notifier(int);
364 bool flush_pending_aio();
365 steady_clock_type::time_point next_pending_aio() const noexcept;
366 bool reap_kernel_completions();
367 bool flush_tcp_batches();
368 void update_lowres_clocks() noexcept;
369 bool do_expire_lowres_timers() noexcept;
370 bool do_check_lowres_timers() const noexcept;
371 void expire_manual_timers() noexcept;
372 void start_aio_eventfd_loop();
373 void stop_aio_eventfd_loop();
374 template <typename T, typename E, typename EnableFunc>
375 void complete_timers(T&, E&, EnableFunc&& enable_fn) noexcept(noexcept(enable_fn()));
376
377 /**
378 * Returns TRUE if all pollers allow blocking.
379 *
380 * @return FALSE if at least one of the blockers requires a non-blocking
381 * execution.
382 */
383 bool poll_once();
384 bool pure_poll_once();
385 public:
386 /// Register a user-defined signal handler
387 void handle_signal(int signo, noncopyable_function<void ()>&& handler);
388 void wakeup();
389
390 private:
391 class signals {
392 public:
393 signals();
394 ~signals();
395
396 bool poll_signal();
397 bool pure_poll_signal() const;
398 void handle_signal(int signo, noncopyable_function<void ()>&& handler);
399 void handle_signal_once(int signo, noncopyable_function<void ()>&& handler);
400 static void action(int signo, siginfo_t* siginfo, void* ignore);
401 static void failed_to_handle(int signo);
402 private:
403 struct signal_handler {
404 signal_handler(int signo, noncopyable_function<void ()>&& handler);
405 noncopyable_function<void ()> _handler;
406 };
407 std::atomic<uint64_t> _pending_signals;
408 std::unordered_map<int, signal_handler> _signal_handlers;
409
410 friend void reactor::handle_signal(int, noncopyable_function<void ()>&&);
411 };
412
413 signals _signals;
414 std::unique_ptr<thread_pool> _thread_pool;
415 friend class thread_pool;
416 friend class thread_context;
417 friend class internal::cpu_stall_detector;
418
419 uint64_t pending_task_count() const;
420 void run_tasks(task_queue& tq);
421 bool have_more_tasks() const;
422 bool posix_reuseport_detect();
423 void run_some_tasks();
424 void activate(task_queue& tq);
425 void insert_active_task_queue(task_queue* tq);
426 task_queue* pop_active_task_queue(sched_clock::time_point now);
427 void insert_activating_task_queues();
428 void account_runtime(task_queue& tq, sched_clock::duration runtime);
429 void account_idle(sched_clock::duration idletime);
430 void allocate_scheduling_group_specific_data(scheduling_group sg, scheduling_group_key key);
431 future<> init_scheduling_group(scheduling_group sg, sstring name, float shares);
432 future<> init_new_scheduling_group_key(scheduling_group_key key, scheduling_group_key_config cfg);
433 future<> destroy_scheduling_group(scheduling_group sg) noexcept;
434 uint64_t tasks_processed() const;
435 uint64_t min_vruntime() const;
436 void request_preemption();
437 void start_handling_signal();
438 void reset_preemption_monitor();
439 void service_highres_timer() noexcept;
440
441 future<std::tuple<pollable_fd, socket_address>>
442 do_accept(pollable_fd_state& listen_fd);
443 future<> do_connect(pollable_fd_state& pfd, socket_address& sa);
444
445 future<size_t>
446 do_read(pollable_fd_state& fd, void* buffer, size_t size);
447 future<size_t>
448 do_recvmsg(pollable_fd_state& fd, const std::vector<iovec>& iov);
449 future<temporary_buffer<char>>
450 do_read_some(pollable_fd_state& fd, internal::buffer_allocator* ba);
451
452 future<size_t>
453 do_send(pollable_fd_state& fd, const void* buffer, size_t size);
454 future<size_t>
455 do_sendmsg(pollable_fd_state& fd, net::packet& p);
456
457 future<temporary_buffer<char>>
458 do_recv_some(pollable_fd_state& fd, internal::buffer_allocator* ba);
459
460 int do_run();
461 public:
462 explicit reactor(std::shared_ptr<smp> smp, alien::instance& alien, unsigned id, reactor_backend_selector rbs, reactor_config cfg);
463 reactor(const reactor&) = delete;
464 ~reactor();
465 void operator=(const reactor&) = delete;
466
467 static sched_clock::time_point now() noexcept {
468 return sched_clock::now();
469 }
470 sched_clock::duration uptime() {
471 return now() - _start_time;
472 }
473
474 io_queue& get_io_queue(dev_t devid = 0) {
475 auto queue = _io_queues.find(devid);
476 if (queue == _io_queues.end()) {
477 return *_io_queues.at(0);
478 } else {
479 return *(queue->second);
480 }
481 }
482
483 [[deprecated("Use io_priority_class::register_one")]]
484 io_priority_class register_one_priority_class(sstring name, uint32_t shares);
485
486 [[deprecated("Use io_priority_class.update_shares")]]
487 future<> update_shares_for_class(io_priority_class pc, uint32_t shares);
488 /// @private
489 void update_shares_for_queues(io_priority_class pc, uint32_t shares);
490 /// @private
491 future<> update_bandwidth_for_queues(io_priority_class pc, uint64_t bandwidth);
492
493 [[deprecated("Use io_priority_class.rename")]]
494 static future<> rename_priority_class(io_priority_class pc, sstring new_name) noexcept;
495 /// @private
496 void rename_queues(io_priority_class pc, sstring new_name);
497
498 void configure(const reactor_options& opts);
499
500 server_socket listen(socket_address sa, listen_options opts = {});
501
502 future<connected_socket> connect(socket_address sa);
503 future<connected_socket> connect(socket_address, socket_address, transport proto = transport::TCP);
504
505 pollable_fd posix_listen(socket_address sa, listen_options opts = {});
506
507 bool posix_reuseport_available() const { return _reuseport; }
508
509 pollable_fd make_pollable_fd(socket_address sa, int proto);
510
511 future<> posix_connect(pollable_fd pfd, socket_address sa, socket_address local);
512
513 future<> send_all(pollable_fd_state& fd, const void* buffer, size_t size);
514
515 future<file> open_file_dma(std::string_view name, open_flags flags, file_open_options options = {}) noexcept;
516 future<file> open_directory(std::string_view name) noexcept;
517 future<> make_directory(std::string_view name, file_permissions permissions = file_permissions::default_dir_permissions) noexcept;
518 future<> touch_directory(std::string_view name, file_permissions permissions = file_permissions::default_dir_permissions) noexcept;
519 future<std::optional<directory_entry_type>> file_type(std::string_view name, follow_symlink = follow_symlink::yes) noexcept;
520 future<stat_data> file_stat(std::string_view pathname, follow_symlink) noexcept;
521 future<uint64_t> file_size(std::string_view pathname) noexcept;
522 future<bool> file_accessible(std::string_view pathname, access_flags flags) noexcept;
523 future<bool> file_exists(std::string_view pathname) noexcept {
524 return file_accessible(pathname, access_flags::exists);
525 }
526 future<fs_type> file_system_at(std::string_view pathname) noexcept;
527 future<struct statvfs> statvfs(std::string_view pathname) noexcept;
528 future<> remove_file(std::string_view pathname) noexcept;
529 future<> rename_file(std::string_view old_pathname, std::string_view new_pathname) noexcept;
530 future<> link_file(std::string_view oldpath, std::string_view newpath) noexcept;
531 future<> chmod(std::string_view name, file_permissions permissions) noexcept;
532
533 future<int> inotify_add_watch(int fd, std::string_view path, uint32_t flags);
534
535 future<std::tuple<file_desc, file_desc>> make_pipe();
536 future<std::tuple<pid_t, file_desc, file_desc, file_desc>>
537 spawn(std::string_view pathname,
538 std::vector<sstring> argv,
539 std::vector<sstring> env = {});
540 future<int> waitpid(pid_t pid);
541 void kill(pid_t pid, int sig);
542
543 int run() noexcept;
544 void exit(int ret);
545 future<> when_started() { return _start_promise.get_future(); }
546 // The function waits for timeout period for reactor stop notification
547 // which happens on termination signals or call for exit().
548 template <typename Rep, typename Period>
549 future<> wait_for_stop(std::chrono::duration<Rep, Period> timeout) {
550 return _stop_requested.wait(timeout, [this] { return _stopping; });
551 }
552
553 void at_exit(noncopyable_function<future<> ()> func);
554
555 template <typename Func>
556 void at_destroy(Func&& func) {
557 _at_destroy_tasks->_q.push_back(make_task(default_scheduling_group(), std::forward<Func>(func)));
558 }
559
560 task* current_task() const { return _current_task; }
561
562 void add_task(task* t) noexcept;
563 void add_urgent_task(task* t) noexcept;
564
565 /// Set a handler that will be called when there is no task to execute on cpu.
566 /// Handler should do a low priority work.
567 ///
568 /// Handler's return value determines whether handler did any actual work. If no work was done then reactor will go
569 /// into sleep.
570 ///
571 /// Handler's argument is a function that returns true if a task which should be executed on cpu appears or false
572 /// otherwise. This function should be used by a handler to return early if a task appears.
573 void set_idle_cpu_handler(idle_cpu_handler&& handler) {
574 _idle_cpu_handler = std::move(handler);
575 }
576 void force_poll();
577
578 void add_high_priority_task(task*) noexcept;
579
580 network_stack& net() { return *_network_stack; }
581
582 [[deprecated("Use this_shard_id")]]
583 shard_id cpu_id() const;
584
585 void sleep();
586
587 steady_clock_type::duration total_idle_time();
588 steady_clock_type::duration total_busy_time();
589 std::chrono::nanoseconds total_steal_time();
590
591 const io_stats& get_io_stats() const { return _io_stats; }
592 /// Returns statistics related to scheduling. The statistics are
593 /// local to this shard.
594 ///
595 /// See \ref sched_stats for a description of individual statistics.
596 /// \return An object containing a snapshot of the statistics at this point in time.
597 sched_stats get_sched_stats() const;
598 uint64_t abandoned_failed_futures() const { return _abandoned_failed_futures; }
599 #ifdef HAVE_OSV
600 void timer_thread_func();
601 void set_timer(sched::timer &tmr, s64 t);
602 #endif
603 private:
604 /**
605 * Add a new "poller" - a non-blocking function returning a boolean, that
606 * will be called every iteration of a main loop.
607 * If it returns FALSE then reactor's main loop is forbidden to block in the
608 * current iteration.
609 *
610 * @param fn a new "poller" function to register
611 */
612 void register_poller(pollfn* p);
613 void unregister_poller(pollfn* p);
614 void replace_poller(pollfn* old, pollfn* neww);
615 void register_metrics();
616 future<> send_all_part(pollable_fd_state& fd, const void* buffer, size_t size, size_t completed);
617
618 future<> fdatasync(int fd) noexcept;
619
620 void add_timer(timer<steady_clock_type>*) noexcept;
621 bool queue_timer(timer<steady_clock_type>*) noexcept;
622 void del_timer(timer<steady_clock_type>*) noexcept;
623 void add_timer(timer<lowres_clock>*) noexcept;
624 bool queue_timer(timer<lowres_clock>*) noexcept;
625 void del_timer(timer<lowres_clock>*) noexcept;
626 void add_timer(timer<manual_clock>*) noexcept;
627 bool queue_timer(timer<manual_clock>*) noexcept;
628 void del_timer(timer<manual_clock>*) noexcept;
629
630 future<> run_exit_tasks();
631 void stop();
632 friend class alien::message_queue;
633 friend class pollable_fd;
634 friend class pollable_fd_state;
635 friend struct pollable_fd_state_deleter;
636 friend class posix_file_impl;
637 friend class blockdev_file_impl;
638 friend class timer<>;
639 friend class timer<lowres_clock>;
640 friend class timer<manual_clock>;
641 friend class smp;
642 friend class smp_message_queue;
643 friend class internal::poller;
644 friend class scheduling_group;
645 friend void add_to_flush_poller(output_stream<char>& os) noexcept;
646 friend void seastar::internal::increase_thrown_exceptions_counter() noexcept;
647 friend void report_failed_future(const std::exception_ptr& eptr) noexcept;
648 friend void with_allow_abandoned_failed_futures(unsigned count, noncopyable_function<void ()> func);
649 metrics::metric_groups _metric_groups;
650 friend future<scheduling_group> create_scheduling_group(sstring name, float shares) noexcept;
651 friend future<> seastar::destroy_scheduling_group(scheduling_group) noexcept;
652 friend future<> seastar::rename_scheduling_group(scheduling_group sg, sstring new_name) noexcept;
653 friend future<scheduling_group_key> scheduling_group_key_create(scheduling_group_key_config cfg) noexcept;
654
655 template<typename T>
656 friend T* internal::scheduling_group_get_specific_ptr(scheduling_group sg, scheduling_group_key key) noexcept;
657 template<typename SpecificValType, typename Mapper, typename Reducer, typename Initial>
658 SEASTAR_CONCEPT( requires requires(SpecificValType specific_val, Mapper mapper, Reducer reducer, Initial initial) {
659 {reducer(initial, mapper(specific_val))} -> std::convertible_to<Initial>;
660 })
661 friend future<typename function_traits<Reducer>::return_type>
662 map_reduce_scheduling_group_specific(Mapper mapper, Reducer reducer, Initial initial_val, scheduling_group_key key);
663 template<typename SpecificValType, typename Reducer, typename Initial>
664 SEASTAR_CONCEPT( requires requires(SpecificValType specific_val, Reducer reducer, Initial initial) {
665 {reducer(initial, specific_val)} -> std::convertible_to<Initial>;
666 })
667 friend future<typename function_traits<Reducer>::return_type>
668 reduce_scheduling_group_specific(Reducer reducer, Initial initial_val, scheduling_group_key key);
669
670 future<struct stat> fstat(int fd) noexcept;
671 future<struct statfs> fstatfs(int fd) noexcept;
672 friend future<shared_ptr<file_impl>> make_file_impl(int fd, file_open_options options, int flags) noexcept;
673 public:
674 future<> readable(pollable_fd_state& fd);
675 future<> writeable(pollable_fd_state& fd);
676 future<> readable_or_writeable(pollable_fd_state& fd);
677 future<> poll_rdhup(pollable_fd_state& fd);
678 void enable_timer(steady_clock_type::time_point when) noexcept;
679 /// Sets the "Strict DMA" flag.
680 ///
681 /// When true (default), file I/O operations must use DMA. This is
682 /// the most performant option, but does not work on some file systems
683 /// such as tmpfs or aufs (used in some Docker setups).
684 ///
685 /// When false, file I/O operations can fall back to buffered I/O if
686 /// DMA is not available. This can result in dramatic reducation in
687 /// performance and an increase in memory consumption.
688 void set_strict_dma(bool value);
689 void set_bypass_fsync(bool value);
690 void update_blocked_reactor_notify_ms(std::chrono::milliseconds ms);
691 std::chrono::milliseconds get_blocked_reactor_notify_ms() const;
692 /// For testing, sets the stall reporting function which is called when
693 /// a stall is detected (and not suppressed). Setting the function also
694 /// resets the supression state.
695 void set_stall_detector_report_function(std::function<void ()> report);
696 std::function<void ()> get_stall_detector_report_function() const;
697 };
698
699 template <typename Func> // signature: bool ()
700 inline
701 std::unique_ptr<seastar::pollfn>
702 internal::make_pollfn(Func&& func) {
703 struct the_pollfn : simple_pollfn<false> {
704 the_pollfn(Func&& func) : func(std::forward<Func>(func)) {}
705 Func func;
706 virtual bool poll() override final {
707 return func();
708 }
709 };
710 return std::make_unique<the_pollfn>(std::forward<Func>(func));
711 }
712
713 extern __thread reactor* local_engine;
714 extern __thread size_t task_quota;
715
716 inline reactor& engine() {
717 return *local_engine;
718 }
719
720 inline bool engine_is_ready() {
721 return local_engine != nullptr;
722 }
723
724 inline int hrtimer_signal() {
725 // We don't want to use SIGALRM, because the boost unit test library
726 // also plays with it.
727 return SIGRTMIN;
728 }
729
730
731 extern logger seastar_logger;
732
733 }