1 // -*- mode:C++; tab-width:4; c-basic-offset:4; indent-tabs-mode:nil -*-
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 * Copyright (C) 2018 Red Hat
30 #include <boost/lockfree/queue.hpp>
32 #include <seastar/core/future.hh>
33 #include <seastar/core/cacheline.hh>
34 #include <seastar/core/sstring.hh>
35 #include <seastar/core/metrics_registration.hh>
43 /// \brief Integration with non-seastar applications.
47 static constexpr size_t batch_size = 128;
48 static constexpr size_t prefetch_cnt = 2;
50 struct lf_queue_remote {
53 using lf_queue_base = boost::lockfree::queue<work_item*>;
54 // use inheritence to control placement order
55 struct lf_queue : lf_queue_remote, lf_queue_base {
56 lf_queue(reactor* remote)
57 : lf_queue_remote{remote}, lf_queue_base{batch_size} {}
60 struct alignas(seastar::cache_line_size) {
61 std::atomic<size_t> value{0};
63 // keep this between two structures with statistics
64 // this makes sure that they have at least one cache line
65 // between them, so hw prefetcher will not accidentally prefetch
66 // cache line used by another cpu.
67 metrics::metric_groups _metrics;
68 struct alignas(seastar::cache_line_size) {
70 size_t _last_rcv_batch = 0;
73 virtual ~work_item() = default;
74 virtual void process() = 0;
76 template <typename Func>
77 struct async_work_item : work_item {
79 async_work_item(Func&& func) : _func(std::move(func)) {}
80 void process() override {
84 template<typename Func>
85 size_t process_queue(lf_queue& q, Func process);
86 void submit_item(std::unique_ptr<work_item> wi);
88 message_queue(reactor *to);
91 template <typename Func>
92 void submit(Func&& func) {
93 auto wi = std::make_unique<async_work_item<Func>>(std::forward<Func>(func));
94 submit_item(std::move(wi));
96 size_t process_incoming();
97 bool pure_poll_rx() const;
103 qs_deleter(unsigned n = 0) : count(n) {}
104 qs_deleter(const qs_deleter& d) : count(d.count) {}
105 void operator()(message_queue* qs) const;
107 using qs = std::unique_ptr<message_queue[], qs_deleter>;
109 static qs create_qs(const std::vector<reactor*>& reactors);
111 static bool poll_queues();
112 static bool pure_poll_queues();
115 /// Runs a function on a remote shard from an alien thread where engine() is not available.
117 /// \param shard designates the shard to run the function on
118 /// \param func a callable to run on shard \c t. If \c func is a temporary object,
119 /// its lifetime will be extended by moving it. If \c func is a reference,
120 /// the caller must guarantee that it will survive the call.
121 /// \note the func must not throw and should return \c void. as we cannot identify the
122 /// alien thread, hence we are not able to post the fulfilled promise to the
123 /// message queue managed by the shard executing the alien thread which is
124 /// interested to the return value. Please use \c submit_to() instead, if
126 template <typename Func>
127 void run_on(unsigned shard, Func func) {
128 smp::_qs[shard].submit(std::move(func));
132 template<typename Func>
133 using return_value_t = typename futurize<std::invoke_result_t<Func>>::value_type;
135 template<typename Func,
136 bool = std::is_empty_v<return_value_t<Func>>>
137 struct return_type_of {
139 static void set(std::promise<void>& p, return_value_t<Func>&&) {
143 template<typename Func>
144 struct return_type_of<Func, false> {
145 using return_tuple_t = typename futurize<std::invoke_result_t<Func>>::tuple_type;
146 using type = std::tuple_element_t<0, return_tuple_t>;
147 static void set(std::promise<type>& p, return_value_t<Func>&& t) {
148 #if SEASTAR_API_LEVEL < 5
149 p.set_value(std::get<0>(std::move(t)));
151 p.set_value(std::move(t));
155 template <typename Func> using return_type_t = typename return_type_of<Func>::type;
158 /// Runs a function on a remote shard from an alien thread where engine() is not available.
160 /// \param shard designates the shard to run the function on
161 /// \param func a callable to run on \c shard. If \c func is a temporary object,
162 /// its lifetime will be extended by moving it. If \c func is a reference,
163 /// the caller must guarantee that it will survive the call.
164 /// \return whatever \c func returns, as a \c std::future<>
165 /// \note the caller must keep the returned future alive until \c func returns
166 template<typename Func, typename T = internal::return_type_t<Func>>
167 std::future<T> submit_to(unsigned shard, Func func) {
169 auto fut = pr.get_future();
170 run_on(shard, [pr = std::move(pr), func = std::move(func)] () mutable {
171 // std::future returned via std::promise above.
172 (void)func().then_wrapped([pr = std::move(pr)] (auto&& result) mutable {
174 internal::return_type_of<Func>::set(pr, result.get());
176 pr.set_exception(std::current_exception());