]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright (C) 2015 Cloudius Systems, Ltd. | |
20 | */ | |
21 | ||
22 | #pragma once | |
23 | ||
24 | #include <seastar/core/future.hh> | |
25 | #include <seastar/core/queue.hh> | |
26 | ||
27 | #include <seastar/util/std-compat.hh> | |
28 | ||
29 | /// \defgroup fiber-module Fibers | |
30 | /// | |
31 | /// \brief Fibers of execution | |
32 | /// | |
33 | /// Seastar continuations are normally short, but often chained to one | |
34 | /// another, so that one continuation does a bit of work and then schedules | |
35 | /// another continuation for later. Such chains can be long, and often even | |
36 | /// involve loopings - see for example \ref repeat. We call such chains | |
37 | /// "fibers" of execution. | |
38 | /// | |
39 | /// These fibers are not threads - each is just a string of continuations - | |
40 | /// but they share some common requirements with traditional threads. | |
41 | /// For example, we want to avoid one fiber getting starved while a second | |
42 | /// fiber continuously runs its continuations one after another. | |
43 | /// As another example, fibers may want to communicate - e.g., one fiber | |
44 | /// produces data that a second fiber consumes, and we wish to ensure that | |
45 | /// both fibers get a chance to run, and that if one stops prematurely, | |
46 | /// the other doesn't hang forever. | |
47 | /// | |
48 | /// Consult the following table to see which APIs are useful for fiber tasks: | |
49 | /// | |
50 | /// Task | APIs | |
51 | /// -----------------------------------------------|------------------- | |
52 | /// Repeat a blocking task indefinitely | \ref keep_doing() | |
53 | /// Repeat a blocking task, then exit | \ref repeat(), \ref do_until() | |
54 | /// Provide mutual exclusion between two tasks | \ref semaphore, \ref shared_mutex | |
55 | /// Pass a stream of data between two fibers | \ref seastar::pipe | |
f67539c2 | 56 | /// Safely shut down a resource | \ref seastar::gate |
11fdf7f2 TL |
57 | /// Hold on to an object while a fiber is running | \ref do_with() |
58 | /// | |
59 | ||
60 | /// Seastar API namespace | |
61 | namespace seastar { | |
62 | ||
63 | /// \addtogroup fiber-module | |
64 | /// @{ | |
65 | ||
66 | class broken_pipe_exception : public std::exception { | |
67 | public: | |
68 | virtual const char* what() const noexcept { | |
69 | return "Broken pipe"; | |
70 | } | |
71 | }; | |
72 | ||
73 | class unread_overflow_exception : public std::exception { | |
74 | public: | |
75 | virtual const char* what() const noexcept { | |
76 | return "pipe_reader::unread() overflow"; | |
77 | } | |
78 | }; | |
79 | ||
80 | /// \cond internal | |
81 | namespace internal { | |
82 | template <typename T> | |
83 | class pipe_buffer { | |
84 | private: | |
f67539c2 | 85 | queue<std::optional<T>> _buf; |
11fdf7f2 TL |
86 | bool _read_open = true; |
87 | bool _write_open = true; | |
88 | public: | |
89 | pipe_buffer(size_t size) : _buf(size) {} | |
f67539c2 | 90 | future<std::optional<T>> read() { |
11fdf7f2 TL |
91 | return _buf.pop_eventually(); |
92 | } | |
93 | future<> write(T&& data) { | |
94 | return _buf.push_eventually(std::move(data)); | |
95 | } | |
96 | bool readable() const { | |
97 | return _write_open || !_buf.empty(); | |
98 | } | |
99 | bool writeable() const { | |
100 | return _read_open; | |
101 | } | |
102 | bool close_read() { | |
103 | // If a writer blocking (on a full queue), need to stop it. | |
104 | if (_buf.full()) { | |
105 | _buf.abort(std::make_exception_ptr(broken_pipe_exception())); | |
106 | } | |
107 | _read_open = false; | |
108 | return !_write_open; | |
109 | } | |
110 | bool close_write() { | |
111 | // If the queue is empty, write the EOF (disengaged optional) to the | |
112 | // queue to wake a blocked reader. If the queue is not empty, there is | |
113 | // no need to write the EOF to the queue - the reader will return an | |
114 | // EOF when it sees that _write_open == false. | |
115 | if (_buf.empty()) { | |
116 | _buf.push({}); | |
117 | } | |
118 | _write_open = false; | |
119 | return !_read_open; | |
120 | } | |
121 | }; | |
122 | } // namespace internal | |
123 | /// \endcond | |
124 | ||
125 | template <typename T> | |
126 | class pipe; | |
127 | ||
128 | /// \brief Read side of a \ref seastar::pipe | |
129 | /// | |
130 | /// The read side of a pipe, which allows only reading from the pipe. | |
131 | /// A pipe_reader object cannot be created separately, but only as part of a | |
132 | /// reader/writer pair through \ref seastar::pipe. | |
133 | template <typename T> | |
134 | class pipe_reader { | |
135 | private: | |
136 | internal::pipe_buffer<T> *_bufp; | |
f67539c2 | 137 | std::optional<T> _unread; |
20effc67 | 138 | pipe_reader(internal::pipe_buffer<T> *bufp) noexcept : _bufp(bufp) { } |
11fdf7f2 TL |
139 | friend class pipe<T>; |
140 | public: | |
141 | /// \brief Read next item from the pipe | |
142 | /// | |
143 | /// Returns a future value, which is fulfilled when the pipe's buffer | |
144 | /// becomes non-empty, or the write side is closed. The value returned | |
145 | /// is an optional<T>, which is disengaged to mark and end of file | |
146 | /// (i.e., the write side was closed, and we've read everything it sent). | |
f67539c2 | 147 | future<std::optional<T>> read() { |
11fdf7f2 TL |
148 | if (_unread) { |
149 | auto ret = std::move(*_unread); | |
150 | _unread = {}; | |
f67539c2 | 151 | return make_ready_future<std::optional<T>>(std::move(ret)); |
11fdf7f2 TL |
152 | } |
153 | if (_bufp->readable()) { | |
154 | return _bufp->read(); | |
155 | } else { | |
f67539c2 | 156 | return make_ready_future<std::optional<T>>(); |
11fdf7f2 TL |
157 | } |
158 | } | |
159 | /// \brief Return an item to the front of the pipe | |
160 | /// | |
161 | /// Pushes the given item to the front of the pipe, so it will be | |
162 | /// returned by the next read() call. The typical use case is to | |
163 | /// unread() the last item returned by read(). | |
164 | /// More generally, it is legal to unread() any item, not just one | |
165 | /// previously returned by read(), but note that the unread() is limited | |
166 | /// to just one item - two calls to unread() without an intervening call | |
167 | /// to read() will cause an exception. | |
168 | void unread(T&& item) { | |
169 | if (_unread) { | |
170 | throw unread_overflow_exception(); | |
171 | } | |
172 | _unread = std::move(item); | |
173 | } | |
174 | ~pipe_reader() { | |
175 | if (_bufp && _bufp->close_read()) { | |
176 | delete _bufp; | |
177 | } | |
178 | } | |
179 | // Allow move, but not copy, of pipe_reader | |
20effc67 | 180 | pipe_reader(pipe_reader&& other) noexcept : _bufp(other._bufp) { |
11fdf7f2 TL |
181 | other._bufp = nullptr; |
182 | } | |
20effc67 | 183 | pipe_reader& operator=(pipe_reader&& other) noexcept { |
11fdf7f2 TL |
184 | std::swap(_bufp, other._bufp); |
185 | } | |
186 | }; | |
187 | ||
188 | /// \brief Write side of a \ref seastar::pipe | |
189 | /// | |
190 | /// The write side of a pipe, which allows only writing to the pipe. | |
191 | /// A pipe_writer object cannot be created separately, but only as part of a | |
192 | /// reader/writer pair through \ref seastar::pipe. | |
193 | template <typename T> | |
194 | class pipe_writer { | |
195 | private: | |
196 | internal::pipe_buffer<T> *_bufp; | |
20effc67 | 197 | pipe_writer(internal::pipe_buffer<T> *bufp) noexcept : _bufp(bufp) { } |
11fdf7f2 TL |
198 | friend class pipe<T>; |
199 | public: | |
200 | /// \brief Write an item to the pipe | |
201 | /// | |
202 | /// Returns a future value, which is fulfilled when the data was written | |
203 | /// to the buffer (when it become non-full). If the data could not be | |
204 | /// written because the read side was closed, an exception | |
205 | /// \ref broken_pipe_exception is returned in the future. | |
206 | future<> write(T&& data) { | |
207 | if (_bufp->writeable()) { | |
208 | return _bufp->write(std::move(data)); | |
209 | } else { | |
210 | return make_exception_future<>(broken_pipe_exception()); | |
211 | } | |
212 | } | |
213 | ~pipe_writer() { | |
214 | if (_bufp && _bufp->close_write()) { | |
215 | delete _bufp; | |
216 | } | |
217 | } | |
218 | // Allow move, but not copy, of pipe_writer | |
20effc67 | 219 | pipe_writer(pipe_writer&& other) noexcept : _bufp(other._bufp) { |
11fdf7f2 TL |
220 | other._bufp = nullptr; |
221 | } | |
20effc67 | 222 | pipe_writer& operator=(pipe_writer&& other) noexcept { |
11fdf7f2 TL |
223 | std::swap(_bufp, other._bufp); |
224 | } | |
225 | }; | |
226 | ||
227 | /// \brief A fixed-size pipe for communicating between two fibers. | |
228 | /// | |
229 | /// A pipe<T> is a mechanism to transfer data between two fibers, one | |
230 | /// producing data, and the other consuming it. The fixed-size buffer also | |
231 | /// ensures a balanced execution of the two fibers, because the producer | |
232 | /// fiber blocks when it writes to a full pipe, until the consumer fiber gets | |
233 | /// to run and read from the pipe. | |
234 | /// | |
235 | /// A pipe<T> resembles a Unix pipe, in that it has a read side, a write side, | |
236 | /// and a fixed-sized buffer between them, and supports either end to be closed | |
237 | /// independently (and EOF or broken pipe when using the other side). | |
238 | /// A pipe<T> object holds the reader and write sides of the pipe as two | |
239 | /// separate objects. These objects can be moved into two different fibers. | |
240 | /// Importantly, if one of the pipe ends is destroyed (i.e., the continuations | |
241 | /// capturing it end), the other end of the pipe will stop blocking, so the | |
242 | /// other fiber will not hang. | |
243 | /// | |
244 | /// The pipe's read and write interfaces are future-based blocking. I.e., the | |
245 | /// write() and read() methods return a future which is fulfilled when the | |
246 | /// operation is complete. The pipe is single-reader single-writer, meaning | |
247 | /// that until the future returned by read() is fulfilled, read() must not be | |
248 | /// called again (and same for write). | |
249 | /// | |
250 | /// Note: The pipe reader and writer are movable, but *not* copyable. It is | |
251 | /// often convenient to wrap each end in a shared pointer, so it can be | |
252 | /// copied (e.g., used in an std::function which needs to be copyable) or | |
253 | /// easily captured into multiple continuations. | |
254 | template <typename T> | |
255 | class pipe { | |
256 | public: | |
257 | pipe_reader<T> reader; | |
258 | pipe_writer<T> writer; | |
259 | explicit pipe(size_t size) : pipe(new internal::pipe_buffer<T>(size)) { } | |
260 | private: | |
20effc67 | 261 | pipe(internal::pipe_buffer<T> *bufp) noexcept : reader(bufp), writer(bufp) { } |
11fdf7f2 TL |
262 | }; |
263 | ||
264 | ||
265 | /// @} | |
266 | ||
267 | } // namespace seastar |