]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/include/seastar/core/alien.hh
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / include / seastar / core / alien.hh
1 // -*- mode:C++; tab-width:4; c-basic-offset:4; indent-tabs-mode:nil -*-
2 /*
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
7 *
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 /*
20 * Copyright (C) 2018 Red Hat
21 */
22
23 #pragma once
24
25 #include <atomic>
26 #include <deque>
27 #include <future>
28 #include <memory>
29
30 #include <boost/lockfree/queue.hpp>
31
32 #include <seastar/core/future.hh>
33 #include <seastar/core/cacheline.hh>
34 #include <seastar/core/sstring.hh>
35 #include <seastar/core/metrics_registration.hh>
36
37 /// \file
38
39 namespace seastar {
40
41 class reactor;
42
43 /// \brief Integration with non-seastar applications.
44 namespace alien {
45
46 class message_queue {
47 static constexpr size_t batch_size = 128;
48 static constexpr size_t prefetch_cnt = 2;
49 struct work_item;
50 struct lf_queue_remote {
51 reactor* remote;
52 };
53 using lf_queue_base = boost::lockfree::queue<work_item*>;
54 // use inheritence to control placement order
55 struct lf_queue : lf_queue_remote, lf_queue_base {
56 lf_queue(reactor* remote)
57 : lf_queue_remote{remote}, lf_queue_base{batch_size} {}
58 void maybe_wakeup();
59 } _pending;
60 struct alignas(seastar::cache_line_size) {
61 std::atomic<size_t> value{0};
62 } _sent;
63 // keep this between two structures with statistics
64 // this makes sure that they have at least one cache line
65 // between them, so hw prefetcher will not accidentally prefetch
66 // cache line used by another cpu.
67 metrics::metric_groups _metrics;
68 struct alignas(seastar::cache_line_size) {
69 size_t _received = 0;
70 size_t _last_rcv_batch = 0;
71 };
72 struct work_item {
73 virtual ~work_item() = default;
74 virtual void process() = 0;
75 };
76 template <typename Func>
77 struct async_work_item : work_item {
78 Func _func;
79 async_work_item(Func&& func) : _func(std::move(func)) {}
80 void process() override {
81 _func();
82 }
83 };
84 template<typename Func>
85 size_t process_queue(lf_queue& q, Func process);
86 void submit_item(std::unique_ptr<work_item> wi);
87 public:
88 message_queue(reactor *to);
89 void start();
90 void stop();
91 template <typename Func>
92 void submit(Func&& func) {
93 auto wi = std::make_unique<async_work_item<Func>>(std::forward<Func>(func));
94 submit_item(std::move(wi));
95 }
96 size_t process_incoming();
97 bool pure_poll_rx() const;
98 };
99
100 namespace internal {
101
102 struct qs_deleter {
103 unsigned count;
104 qs_deleter(unsigned n = 0) : count(n) {}
105 qs_deleter(const qs_deleter& d) : count(d.count) {}
106 void operator()(message_queue* qs) const;
107 };
108
109 }
110
111 /// Represents the Seastar system from alien's point of view. In a normal
112 /// system, there is just one instance, but for in-process clustering testing
113 /// there may be more than one. Function such as run_on() direct messages to
114 /// and (instance, shard) tuple.
115 class instance {
116 using qs = std::unique_ptr<message_queue[], internal::qs_deleter>;
117 public:
118 static qs create_qs(const std::vector<reactor*>& reactors);
119 qs _qs;
120 bool poll_queues();
121 bool pure_poll_queues();
122 };
123
124 namespace internal {
125
126 extern instance* default_instance;
127
128 }
129
130 /// Runs a function on a remote shard from an alien thread where engine() is not available.
131 ///
132 /// \param instance designates the Seastar instance to process the message
133 /// \param shard designates the shard to run the function on
134 /// \param func a callable to run on shard \c t. If \c func is a temporary object,
135 /// its lifetime will be extended by moving it. If \c func is a reference,
136 /// the caller must guarantee that it will survive the call.
137 /// \note the func must not throw and should return \c void. as we cannot identify the
138 /// alien thread, hence we are not able to post the fulfilled promise to the
139 /// message queue managed by the shard executing the alien thread which is
140 /// interested to the return value. Please use \c submit_to() instead, if
141 /// \c func throws.
142 template <typename Func>
143 void run_on(instance& instance, unsigned shard, Func func) {
144 instance._qs[shard].submit(std::move(func));
145 }
146
147 /// Runs a function on a remote shard from an alien thread where engine() is not available.
148 ///
149 /// \param shard designates the shard to run the function on
150 /// \param func a callable to run on shard \c t. If \c func is a temporary object,
151 /// its lifetime will be extended by moving it. If \c func is a reference,
152 /// the caller must guarantee that it will survive the call.
153 /// \note the func must not throw and should return \c void. as we cannot identify the
154 /// alien thread, hence we are not able to post the fulfilled promise to the
155 /// message queue managed by the shard executing the alien thread which is
156 /// interested to the return value. Please use \c submit_to() instead, if
157 /// \c func throws.
158 template <typename Func>
159 [[deprecated("Use run_on(instance&, unsigned shard, Func) instead")]]
160 void run_on(unsigned shard, Func func) {
161 run_on(*internal::default_instance, shard, std::move(func));
162 }
163
164 namespace internal {
165 template<typename Func>
166 using return_value_t = typename futurize<std::invoke_result_t<Func>>::value_type;
167
168 template<typename Func,
169 bool = std::is_empty_v<return_value_t<Func>>>
170 struct return_type_of {
171 using type = void;
172 static void set(std::promise<void>& p, return_value_t<Func>&&) {
173 p.set_value();
174 }
175 };
176 template<typename Func>
177 struct return_type_of<Func, false> {
178 using return_tuple_t = typename futurize<std::invoke_result_t<Func>>::tuple_type;
179 using type = std::tuple_element_t<0, return_tuple_t>;
180 static void set(std::promise<type>& p, return_value_t<Func>&& t) {
181 #if SEASTAR_API_LEVEL < 5
182 p.set_value(std::get<0>(std::move(t)));
183 #else
184 p.set_value(std::move(t));
185 #endif
186 }
187 };
188 template <typename Func> using return_type_t = typename return_type_of<Func>::type;
189 }
190
191 /// Runs a function on a remote shard from an alien thread where engine() is not available.
192 ///
193 /// \param instance designates the Seastar instance to process the message
194 /// \param shard designates the shard to run the function on
195 /// \param func a callable to run on \c shard. If \c func is a temporary object,
196 /// its lifetime will be extended by moving it. If \c func is a reference,
197 /// the caller must guarantee that it will survive the call.
198 /// \return whatever \c func returns, as a \c std::future<>
199 /// \note the caller must keep the returned future alive until \c func returns
200 template<typename Func, typename T = internal::return_type_t<Func>>
201 std::future<T> submit_to(instance& instance, unsigned shard, Func func) {
202 std::promise<T> pr;
203 auto fut = pr.get_future();
204 run_on(instance, shard, [pr = std::move(pr), func = std::move(func)] () mutable {
205 // std::future returned via std::promise above.
206 (void)func().then_wrapped([pr = std::move(pr)] (auto&& result) mutable {
207 try {
208 internal::return_type_of<Func>::set(pr, result.get());
209 } catch (...) {
210 pr.set_exception(std::current_exception());
211 }
212 });
213 });
214 return fut;
215 }
216
217 /// Runs a function on a remote shard from an alien thread where engine() is not available.
218 ///
219 /// \param shard designates the shard to run the function on
220 /// \param func a callable to run on \c shard. If \c func is a temporary object,
221 /// its lifetime will be extended by moving it. If \c func is a reference,
222 /// the caller must guarantee that it will survive the call.
223 /// \return whatever \c func returns, as a \c std::future<>
224 /// \note the caller must keep the returned future alive until \c func returns
225 template<typename Func, typename T = internal::return_type_t<Func>>
226 [[deprecated("Use submit_to(instance&, unsigned shard, Func) instead.")]]
227 std::future<T> submit_to(unsigned shard, Func func) {
228 return submit_to(*internal::default_instance, shard, std::move(func));
229 }
230
231 }
232 }