]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/include/seastar/core/alien.hh
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / seastar / include / seastar / core / alien.hh
1 // -*- mode:C++; tab-width:4; c-basic-offset:4; indent-tabs-mode:nil -*-
2 /*
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
7 *
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 /*
20 * Copyright (C) 2018 Red Hat
21 */
22
23 #pragma once
24
25 #include <atomic>
26 #include <deque>
27 #include <future>
28 #include <memory>
29
30 #include <boost/lockfree/queue.hpp>
31
32 #include <seastar/core/future.hh>
33 #include <seastar/core/cacheline.hh>
34 #include <seastar/core/sstring.hh>
35 #include <seastar/core/metrics_registration.hh>
36
37 /// \file
38
39 namespace seastar {
40
41 class reactor;
42
43 /// \brief Integration with non-seastar applications.
44 namespace alien {
45
46 class message_queue {
47 static constexpr size_t batch_size = 128;
48 static constexpr size_t prefetch_cnt = 2;
49 struct work_item;
50 struct lf_queue_remote {
51 reactor* remote;
52 };
53 using lf_queue_base = boost::lockfree::queue<work_item*>;
54 // use inheritence to control placement order
55 struct lf_queue : lf_queue_remote, lf_queue_base {
56 lf_queue(reactor* remote)
57 : lf_queue_remote{remote}, lf_queue_base{batch_size} {}
58 void maybe_wakeup();
59 } _pending;
60 struct alignas(seastar::cache_line_size) {
61 std::atomic<size_t> value{0};
62 } _sent;
63 // keep this between two structures with statistics
64 // this makes sure that they have at least one cache line
65 // between them, so hw prefetcher will not accidentally prefetch
66 // cache line used by another cpu.
67 metrics::metric_groups _metrics;
68 struct alignas(seastar::cache_line_size) {
69 size_t _received = 0;
70 size_t _last_rcv_batch = 0;
71 };
72 struct work_item {
73 virtual ~work_item() = default;
74 virtual void process() = 0;
75 };
76 template <typename Func>
77 struct async_work_item : work_item {
78 Func _func;
79 async_work_item(Func&& func) : _func(std::move(func)) {}
80 void process() override {
81 _func();
82 }
83 };
84 template<typename Func>
85 size_t process_queue(lf_queue& q, Func process);
86 void submit_item(std::unique_ptr<work_item> wi);
87 public:
88 message_queue(reactor *to);
89 void start();
90 void stop();
91 template <typename Func>
92 void submit(Func&& func) {
93 auto wi = std::make_unique<async_work_item<Func>>(std::forward<Func>(func));
94 submit_item(std::move(wi));
95 }
96 size_t process_incoming();
97 bool pure_poll_rx() const;
98 };
99
100 class smp {
101 struct qs_deleter {
102 unsigned count;
103 qs_deleter(unsigned n = 0) : count(n) {}
104 qs_deleter(const qs_deleter& d) : count(d.count) {}
105 void operator()(message_queue* qs) const;
106 };
107 using qs = std::unique_ptr<message_queue[], qs_deleter>;
108 public:
109 static qs create_qs(const std::vector<reactor*>& reactors);
110 static qs _qs;
111 static bool poll_queues();
112 static bool pure_poll_queues();
113 };
114
115 /// Runs a function on a remote shard from an alien thread where engine() is not available.
116 ///
117 /// \param shard designates the shard to run the function on
118 /// \param func a callable to run on shard \c t. If \c func is a temporary object,
119 /// its lifetime will be extended by moving it. If \c func is a reference,
120 /// the caller must guarantee that it will survive the call.
121 /// \note the func must not throw and should return \c void. as we cannot identify the
122 /// alien thread, hence we are not able to post the fulfilled promise to the
123 /// message queue managed by the shard executing the alien thread which is
124 /// interested to the return value. Please use \c submit_to() instead, if
125 /// \c func throws.
126 template <typename Func>
127 void run_on(unsigned shard, Func func) {
128 smp::_qs[shard].submit(std::move(func));
129 }
130
131 namespace internal {
132 template<typename Func>
133 using return_value_t = typename futurize<std::invoke_result_t<Func>>::value_type;
134
135 template<typename Func,
136 bool = std::is_empty_v<return_value_t<Func>>>
137 struct return_type_of {
138 using type = void;
139 static void set(std::promise<void>& p, return_value_t<Func>&&) {
140 p.set_value();
141 }
142 };
143 template<typename Func>
144 struct return_type_of<Func, false> {
145 using return_tuple_t = typename futurize<std::invoke_result_t<Func>>::tuple_type;
146 using type = std::tuple_element_t<0, return_tuple_t>;
147 static void set(std::promise<type>& p, return_value_t<Func>&& t) {
148 #if SEASTAR_API_LEVEL < 5
149 p.set_value(std::get<0>(std::move(t)));
150 #else
151 p.set_value(std::move(t));
152 #endif
153 }
154 };
155 template <typename Func> using return_type_t = typename return_type_of<Func>::type;
156 }
157
158 /// Runs a function on a remote shard from an alien thread where engine() is not available.
159 ///
160 /// \param shard designates the shard to run the function on
161 /// \param func a callable to run on \c shard. If \c func is a temporary object,
162 /// its lifetime will be extended by moving it. If \c func is a reference,
163 /// the caller must guarantee that it will survive the call.
164 /// \return whatever \c func returns, as a \c std::future<>
165 /// \note the caller must keep the returned future alive until \c func returns
166 template<typename Func, typename T = internal::return_type_t<Func>>
167 std::future<T> submit_to(unsigned shard, Func func) {
168 std::promise<T> pr;
169 auto fut = pr.get_future();
170 run_on(shard, [pr = std::move(pr), func = std::move(func)] () mutable {
171 // std::future returned via std::promise above.
172 (void)func().then_wrapped([pr = std::move(pr)] (auto&& result) mutable {
173 try {
174 internal::return_type_of<Func>::set(pr, result.get());
175 } catch (...) {
176 pr.set_exception(std::current_exception());
177 }
178 });
179 });
180 return fut;
181 }
182
183 }
184 }