]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/include/seastar/core/stream.hh
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / seastar / include / seastar / core / stream.hh
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright (C) 2014 Cloudius Systems, Ltd.
20 */
21
22 #pragma once
23
24 #include <seastar/core/future.hh>
25 #include <exception>
26 #include <functional>
27 #include <cassert>
28
29 namespace seastar {
30
31 // A stream/subscription pair is similar to a promise/future pair,
32 // but apply to a sequence of values instead of a single value.
33 //
34 // A stream<> is the producer side. It may call produce() as long
35 // as the future<> returned from the previous invocation is ready.
36 // To signify no more data is available, call close().
37 //
38 // A subscription<> is the consumer side. It is created by a call
39 // to stream::listen(). Calling subscription::start(),
40 // which registers the data processing callback, starts processing
41 // events. It may register for end-of-stream notifications by
42 // chaining the when_done() future, which also delivers error
43 // events (as exceptions).
44 //
45 // The consumer can pause generation of new data by returning
46 // a non-ready future; when the future becomes ready, the producer
47 // will resume processing.
48
49 template <typename... T>
50 class stream;
51
52 template <typename... T>
53 class subscription;
54
55 template <typename... T>
56 class stream {
57 subscription<T...>* _sub = nullptr;
58 promise<> _done;
59 promise<> _ready;
60 public:
61 using next_fn = std::function<future<> (T...)>;
62 stream() = default;
63 stream(const stream&) = delete;
64 stream(stream&&) = delete;
65 ~stream();
66 void operator=(const stream&) = delete;
67 void operator=(stream&&) = delete;
68
69 // Returns a subscription that reads value from this
70 // stream.
71 subscription<T...> listen();
72
73 // Returns a subscription that reads value from this
74 // stream, and also sets up the listen function.
75 subscription<T...> listen(next_fn next);
76
77 // Becomes ready when the listener is ready to accept
78 // values. Call only once, when beginning to produce
79 // values.
80 future<> started();
81
82 // Produce a value. Call only after started(), and after
83 // a previous produce() is ready.
84 future<> produce(T... data);
85
86 // End the stream. Call only after started(), and after
87 // a previous produce() is ready. No functions may be called
88 // after this.
89 void close();
90
91 // Signal an error. Call only after started(), and after
92 // a previous produce() is ready. No functions may be called
93 // after this.
94 template <typename E>
95 void set_exception(E ex);
96 private:
97 void pause(future<> can_continue);
98 void start();
99 friend class subscription<T...>;
100 };
101
102 template <typename... T>
103 class subscription {
104 public:
105 using next_fn = typename stream<T...>::next_fn;
106 private:
107 stream<T...>* _stream;
108 next_fn _next;
109 private:
110 explicit subscription(stream<T...>* s);
111 public:
112 subscription(subscription&& x);
113 ~subscription();
114
115 /// \brief Start receiving events from the stream.
116 ///
117 /// \param next Callback to call for each event
118 void start(std::function<future<> (T...)> next);
119
120 // Becomes ready when the stream is empty, or when an error
121 // happens (in that case, an exception is held).
122 future<> done();
123
124 friend class stream<T...>;
125 };
126
127
128 template <typename... T>
129 inline
130 stream<T...>::~stream() {
131 if (_sub) {
132 _sub->_stream = nullptr;
133 }
134 }
135
136 template <typename... T>
137 inline
138 subscription<T...>
139 stream<T...>::listen() {
140 return subscription<T...>(this);
141 }
142
143 template <typename... T>
144 inline
145 subscription<T...>
146 stream<T...>::listen(next_fn next) {
147 auto sub = subscription<T...>(this);
148 sub.start(std::move(next));
149 return sub;
150 }
151
152 template <typename... T>
153 inline
154 future<>
155 stream<T...>::started() {
156 return _ready.get_future();
157 }
158
159 template <typename... T>
160 inline
161 future<>
162 stream<T...>::produce(T... data) {
163 auto ret = futurize<void>::apply(_sub->_next, std::move(data)...);
164 if (ret.available() && !ret.failed()) {
165 // Native network stack depends on stream::produce() returning
166 // a ready future to push packets along without dropping. As
167 // a temporary workaround, special case a ready, unfailed future
168 // and return it immediately, so that then_wrapped(), below,
169 // doesn't convert a ready future to an unready one.
170 return ret;
171 }
172 return ret.then_wrapped([this] (auto&& f) {
173 try {
174 f.get();
175 } catch (...) {
176 _done.set_exception(std::current_exception());
177 // FIXME: tell the producer to stop producing
178 throw;
179 }
180 });
181 }
182
183 template <typename... T>
184 inline
185 void
186 stream<T...>::close() {
187 _done.set_value();
188 }
189
190 template <typename... T>
191 template <typename E>
192 inline
193 void
194 stream<T...>::set_exception(E ex) {
195 _done.set_exception(ex);
196 }
197
198 template <typename... T>
199 inline
200 subscription<T...>::subscription(stream<T...>* s)
201 : _stream(s) {
202 assert(!_stream->_sub);
203 _stream->_sub = this;
204 }
205
206 template <typename... T>
207 inline
208 void
209 subscription<T...>::start(std::function<future<> (T...)> next) {
210 _next = std::move(next);
211 _stream->_ready.set_value();
212 }
213
214 template <typename... T>
215 inline
216 subscription<T...>::~subscription() {
217 if (_stream) {
218 _stream->_sub = nullptr;
219 }
220 }
221
222 template <typename... T>
223 inline
224 subscription<T...>::subscription(subscription&& x)
225 : _stream(x._stream), _next(std::move(x._next)) {
226 x._stream = nullptr;
227 if (_stream) {
228 _stream->_sub = this;
229 }
230 }
231
232 template <typename... T>
233 inline
234 future<>
235 subscription<T...>::done() {
236 return _stream->_done.get_future();
237 }
238
239 }