1 // -*- mode:C++; tab-width:4; c-basic-offset:4; indent-tabs-mode:nil -*-
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 * Copyright (C) 2018 Red Hat
30 #include <boost/lockfree/queue.hpp>
32 #include <seastar/core/future.hh>
33 #include <seastar/core/cacheline.hh>
34 #include <seastar/core/sstring.hh>
35 #include <seastar/core/metrics_registration.hh>
43 /// \brief Integration with non-seastar applications.
47 static constexpr size_t batch_size = 128;
48 static constexpr size_t prefetch_cnt = 2;
50 struct lf_queue_remote {
53 using lf_queue_base = boost::lockfree::queue<work_item*>;
54 // use inheritence to control placement order
55 struct lf_queue : lf_queue_remote, lf_queue_base {
56 lf_queue(reactor* remote)
57 : lf_queue_remote{remote}, lf_queue_base{batch_size} {}
60 struct alignas(seastar::cache_line_size) {
61 std::atomic<size_t> value{0};
63 // keep this between two structures with statistics
64 // this makes sure that they have at least one cache line
65 // between them, so hw prefetcher will not accidentally prefetch
66 // cache line used by another cpu.
67 metrics::metric_groups _metrics;
68 struct alignas(seastar::cache_line_size) {
70 size_t _last_rcv_batch = 0;
73 virtual ~work_item() = default;
74 virtual void process() = 0;
76 template <typename Func>
77 struct async_work_item : work_item {
79 async_work_item(Func&& func) : _func(std::move(func)) {}
80 void process() override {
84 template<typename Func>
85 size_t process_queue(lf_queue& q, Func process);
86 void submit_item(std::unique_ptr<work_item> wi);
88 message_queue(reactor *to);
91 template <typename Func>
92 void submit(Func&& func) {
93 auto wi = std::make_unique<async_work_item<Func>>(std::forward<Func>(func));
94 submit_item(std::move(wi));
96 size_t process_incoming();
97 bool pure_poll_rx() const;
104 qs_deleter(unsigned n = 0) : count(n) {}
105 qs_deleter(const qs_deleter& d) : count(d.count) {}
106 void operator()(message_queue* qs) const;
111 /// Represents the Seastar system from alien's point of view. In a normal
112 /// system, there is just one instance, but for in-process clustering testing
113 /// there may be more than one. Function such as run_on() direct messages to
114 /// and (instance, shard) tuple.
116 using qs = std::unique_ptr<message_queue[], internal::qs_deleter>;
118 static qs create_qs(const std::vector<reactor*>& reactors);
121 bool pure_poll_queues();
126 extern instance* default_instance;
130 /// Runs a function on a remote shard from an alien thread where engine() is not available.
132 /// \param instance designates the Seastar instance to process the message
133 /// \param shard designates the shard to run the function on
134 /// \param func a callable to run on shard \c t. If \c func is a temporary object,
135 /// its lifetime will be extended by moving it. If \c func is a reference,
136 /// the caller must guarantee that it will survive the call.
137 /// \note the func must not throw and should return \c void. as we cannot identify the
138 /// alien thread, hence we are not able to post the fulfilled promise to the
139 /// message queue managed by the shard executing the alien thread which is
140 /// interested to the return value. Please use \c submit_to() instead, if
142 template <typename Func>
143 void run_on(instance& instance, unsigned shard, Func func) {
144 instance._qs[shard].submit(std::move(func));
147 /// Runs a function on a remote shard from an alien thread where engine() is not available.
149 /// \param shard designates the shard to run the function on
150 /// \param func a callable to run on shard \c t. If \c func is a temporary object,
151 /// its lifetime will be extended by moving it. If \c func is a reference,
152 /// the caller must guarantee that it will survive the call.
153 /// \note the func must not throw and should return \c void. as we cannot identify the
154 /// alien thread, hence we are not able to post the fulfilled promise to the
155 /// message queue managed by the shard executing the alien thread which is
156 /// interested to the return value. Please use \c submit_to() instead, if
158 template <typename Func>
159 [[deprecated("Use run_on(instance&, unsigned shard, Func) instead")]]
160 void run_on(unsigned shard, Func func) {
161 run_on(*internal::default_instance, shard, std::move(func));
165 template<typename Func>
166 using return_value_t = typename futurize<std::invoke_result_t<Func>>::value_type;
168 template<typename Func,
169 bool = std::is_empty_v<return_value_t<Func>>>
170 struct return_type_of {
172 static void set(std::promise<void>& p, return_value_t<Func>&&) {
176 template<typename Func>
177 struct return_type_of<Func, false> {
178 using return_tuple_t = typename futurize<std::invoke_result_t<Func>>::tuple_type;
179 using type = std::tuple_element_t<0, return_tuple_t>;
180 static void set(std::promise<type>& p, return_value_t<Func>&& t) {
181 #if SEASTAR_API_LEVEL < 5
182 p.set_value(std::get<0>(std::move(t)));
184 p.set_value(std::move(t));
188 template <typename Func> using return_type_t = typename return_type_of<Func>::type;
191 /// Runs a function on a remote shard from an alien thread where engine() is not available.
193 /// \param instance designates the Seastar instance to process the message
194 /// \param shard designates the shard to run the function on
195 /// \param func a callable to run on \c shard. If \c func is a temporary object,
196 /// its lifetime will be extended by moving it. If \c func is a reference,
197 /// the caller must guarantee that it will survive the call.
198 /// \return whatever \c func returns, as a \c std::future<>
199 /// \note the caller must keep the returned future alive until \c func returns
200 template<typename Func, typename T = internal::return_type_t<Func>>
201 std::future<T> submit_to(instance& instance, unsigned shard, Func func) {
203 auto fut = pr.get_future();
204 run_on(instance, shard, [pr = std::move(pr), func = std::move(func)] () mutable {
205 // std::future returned via std::promise above.
206 (void)func().then_wrapped([pr = std::move(pr)] (auto&& result) mutable {
208 internal::return_type_of<Func>::set(pr, result.get());
210 pr.set_exception(std::current_exception());
217 /// Runs a function on a remote shard from an alien thread where engine() is not available.
219 /// \param shard designates the shard to run the function on
220 /// \param func a callable to run on \c shard. If \c func is a temporary object,
221 /// its lifetime will be extended by moving it. If \c func is a reference,
222 /// the caller must guarantee that it will survive the call.
223 /// \return whatever \c func returns, as a \c std::future<>
224 /// \note the caller must keep the returned future alive until \c func returns
225 template<typename Func, typename T = internal::return_type_t<Func>>
226 [[deprecated("Use submit_to(instance&, unsigned shard, Func) instead.")]]
227 std::future<T> submit_to(unsigned shard, Func func) {
228 return submit_to(*internal::default_instance, shard, std::move(func));