]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/include/seastar/core/pipe.hh
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / include / seastar / core / pipe.hh
CommitLineData
11fdf7f2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21
22#pragma once
23
24#include <seastar/core/future.hh>
25#include <seastar/core/queue.hh>
26
27#include <seastar/util/std-compat.hh>
28
29/// \defgroup fiber-module Fibers
30///
31/// \brief Fibers of execution
32///
33/// Seastar continuations are normally short, but often chained to one
34/// another, so that one continuation does a bit of work and then schedules
35/// another continuation for later. Such chains can be long, and often even
36/// involve loopings - see for example \ref repeat. We call such chains
37/// "fibers" of execution.
38///
39/// These fibers are not threads - each is just a string of continuations -
40/// but they share some common requirements with traditional threads.
41/// For example, we want to avoid one fiber getting starved while a second
42/// fiber continuously runs its continuations one after another.
43/// As another example, fibers may want to communicate - e.g., one fiber
44/// produces data that a second fiber consumes, and we wish to ensure that
45/// both fibers get a chance to run, and that if one stops prematurely,
46/// the other doesn't hang forever.
47///
48/// Consult the following table to see which APIs are useful for fiber tasks:
49///
50/// Task | APIs
51/// -----------------------------------------------|-------------------
52/// Repeat a blocking task indefinitely | \ref keep_doing()
53/// Repeat a blocking task, then exit | \ref repeat(), \ref do_until()
54/// Provide mutual exclusion between two tasks | \ref semaphore, \ref shared_mutex
55/// Pass a stream of data between two fibers | \ref seastar::pipe
f67539c2 56/// Safely shut down a resource | \ref seastar::gate
11fdf7f2
TL
57/// Hold on to an object while a fiber is running | \ref do_with()
58///
59
60/// Seastar API namespace
61namespace seastar {
62
63/// \addtogroup fiber-module
64/// @{
65
66class broken_pipe_exception : public std::exception {
67public:
68 virtual const char* what() const noexcept {
69 return "Broken pipe";
70 }
71};
72
73class unread_overflow_exception : public std::exception {
74public:
75 virtual const char* what() const noexcept {
76 return "pipe_reader::unread() overflow";
77 }
78};
79
80/// \cond internal
81namespace internal {
82template <typename T>
83class pipe_buffer {
84private:
f67539c2 85 queue<std::optional<T>> _buf;
11fdf7f2
TL
86 bool _read_open = true;
87 bool _write_open = true;
88public:
89 pipe_buffer(size_t size) : _buf(size) {}
f67539c2 90 future<std::optional<T>> read() {
11fdf7f2
TL
91 return _buf.pop_eventually();
92 }
93 future<> write(T&& data) {
94 return _buf.push_eventually(std::move(data));
95 }
96 bool readable() const {
97 return _write_open || !_buf.empty();
98 }
99 bool writeable() const {
100 return _read_open;
101 }
102 bool close_read() {
103 // If a writer blocking (on a full queue), need to stop it.
104 if (_buf.full()) {
105 _buf.abort(std::make_exception_ptr(broken_pipe_exception()));
106 }
107 _read_open = false;
108 return !_write_open;
109 }
110 bool close_write() {
111 // If the queue is empty, write the EOF (disengaged optional) to the
112 // queue to wake a blocked reader. If the queue is not empty, there is
113 // no need to write the EOF to the queue - the reader will return an
114 // EOF when it sees that _write_open == false.
115 if (_buf.empty()) {
116 _buf.push({});
117 }
118 _write_open = false;
119 return !_read_open;
120 }
121};
122} // namespace internal
123/// \endcond
124
125template <typename T>
126class pipe;
127
128/// \brief Read side of a \ref seastar::pipe
129///
130/// The read side of a pipe, which allows only reading from the pipe.
131/// A pipe_reader object cannot be created separately, but only as part of a
132/// reader/writer pair through \ref seastar::pipe.
133template <typename T>
134class pipe_reader {
135private:
136 internal::pipe_buffer<T> *_bufp;
f67539c2 137 std::optional<T> _unread;
20effc67 138 pipe_reader(internal::pipe_buffer<T> *bufp) noexcept : _bufp(bufp) { }
11fdf7f2
TL
139 friend class pipe<T>;
140public:
141 /// \brief Read next item from the pipe
142 ///
143 /// Returns a future value, which is fulfilled when the pipe's buffer
144 /// becomes non-empty, or the write side is closed. The value returned
145 /// is an optional<T>, which is disengaged to mark and end of file
146 /// (i.e., the write side was closed, and we've read everything it sent).
f67539c2 147 future<std::optional<T>> read() {
11fdf7f2
TL
148 if (_unread) {
149 auto ret = std::move(*_unread);
150 _unread = {};
f67539c2 151 return make_ready_future<std::optional<T>>(std::move(ret));
11fdf7f2
TL
152 }
153 if (_bufp->readable()) {
154 return _bufp->read();
155 } else {
f67539c2 156 return make_ready_future<std::optional<T>>();
11fdf7f2
TL
157 }
158 }
159 /// \brief Return an item to the front of the pipe
160 ///
161 /// Pushes the given item to the front of the pipe, so it will be
162 /// returned by the next read() call. The typical use case is to
163 /// unread() the last item returned by read().
164 /// More generally, it is legal to unread() any item, not just one
165 /// previously returned by read(), but note that the unread() is limited
166 /// to just one item - two calls to unread() without an intervening call
167 /// to read() will cause an exception.
168 void unread(T&& item) {
169 if (_unread) {
170 throw unread_overflow_exception();
171 }
172 _unread = std::move(item);
173 }
174 ~pipe_reader() {
175 if (_bufp && _bufp->close_read()) {
176 delete _bufp;
177 }
178 }
179 // Allow move, but not copy, of pipe_reader
20effc67 180 pipe_reader(pipe_reader&& other) noexcept : _bufp(other._bufp) {
11fdf7f2
TL
181 other._bufp = nullptr;
182 }
20effc67 183 pipe_reader& operator=(pipe_reader&& other) noexcept {
11fdf7f2
TL
184 std::swap(_bufp, other._bufp);
185 }
186};
187
188/// \brief Write side of a \ref seastar::pipe
189///
190/// The write side of a pipe, which allows only writing to the pipe.
191/// A pipe_writer object cannot be created separately, but only as part of a
192/// reader/writer pair through \ref seastar::pipe.
193template <typename T>
194class pipe_writer {
195private:
196 internal::pipe_buffer<T> *_bufp;
20effc67 197 pipe_writer(internal::pipe_buffer<T> *bufp) noexcept : _bufp(bufp) { }
11fdf7f2
TL
198 friend class pipe<T>;
199public:
200 /// \brief Write an item to the pipe
201 ///
202 /// Returns a future value, which is fulfilled when the data was written
203 /// to the buffer (when it become non-full). If the data could not be
204 /// written because the read side was closed, an exception
205 /// \ref broken_pipe_exception is returned in the future.
206 future<> write(T&& data) {
207 if (_bufp->writeable()) {
208 return _bufp->write(std::move(data));
209 } else {
210 return make_exception_future<>(broken_pipe_exception());
211 }
212 }
213 ~pipe_writer() {
214 if (_bufp && _bufp->close_write()) {
215 delete _bufp;
216 }
217 }
218 // Allow move, but not copy, of pipe_writer
20effc67 219 pipe_writer(pipe_writer&& other) noexcept : _bufp(other._bufp) {
11fdf7f2
TL
220 other._bufp = nullptr;
221 }
20effc67 222 pipe_writer& operator=(pipe_writer&& other) noexcept {
11fdf7f2
TL
223 std::swap(_bufp, other._bufp);
224 }
225};
226
227/// \brief A fixed-size pipe for communicating between two fibers.
228///
229/// A pipe<T> is a mechanism to transfer data between two fibers, one
230/// producing data, and the other consuming it. The fixed-size buffer also
231/// ensures a balanced execution of the two fibers, because the producer
232/// fiber blocks when it writes to a full pipe, until the consumer fiber gets
233/// to run and read from the pipe.
234///
235/// A pipe<T> resembles a Unix pipe, in that it has a read side, a write side,
236/// and a fixed-sized buffer between them, and supports either end to be closed
237/// independently (and EOF or broken pipe when using the other side).
238/// A pipe<T> object holds the reader and write sides of the pipe as two
239/// separate objects. These objects can be moved into two different fibers.
240/// Importantly, if one of the pipe ends is destroyed (i.e., the continuations
241/// capturing it end), the other end of the pipe will stop blocking, so the
242/// other fiber will not hang.
243///
244/// The pipe's read and write interfaces are future-based blocking. I.e., the
245/// write() and read() methods return a future which is fulfilled when the
246/// operation is complete. The pipe is single-reader single-writer, meaning
247/// that until the future returned by read() is fulfilled, read() must not be
248/// called again (and same for write).
249///
250/// Note: The pipe reader and writer are movable, but *not* copyable. It is
251/// often convenient to wrap each end in a shared pointer, so it can be
252/// copied (e.g., used in an std::function which needs to be copyable) or
253/// easily captured into multiple continuations.
254template <typename T>
255class pipe {
256public:
257 pipe_reader<T> reader;
258 pipe_writer<T> writer;
259 explicit pipe(size_t size) : pipe(new internal::pipe_buffer<T>(size)) { }
260private:
20effc67 261 pipe(internal::pipe_buffer<T> *bufp) noexcept : reader(bufp), writer(bufp) { }
11fdf7f2
TL
262};
263
264
265/// @}
266
267} // namespace seastar