/// Repeat a blocking task, then exit | \ref repeat(), \ref do_until()
/// Provide mutual exclusion between two tasks | \ref semaphore, \ref shared_mutex
/// Pass a stream of data between two fibers | \ref seastar::pipe
-/// Safely shut down a resource | \ref seastar::gate, \ref seastar::with_gate()
+/// Safely shut down a resource | \ref seastar::gate
/// Hold on to an object while a fiber is running | \ref do_with()
///
template <typename T>
class pipe_buffer {
private:
- queue<compat::optional<T>> _buf;
+ queue<std::optional<T>> _buf;
bool _read_open = true;
bool _write_open = true;
public:
pipe_buffer(size_t size) : _buf(size) {}
- future<compat::optional<T>> read() {
+ future<std::optional<T>> read() {
return _buf.pop_eventually();
}
future<> write(T&& data) {
class pipe_reader {
private:
internal::pipe_buffer<T> *_bufp;
- compat::optional<T> _unread;
- pipe_reader(internal::pipe_buffer<T> *bufp) : _bufp(bufp) { }
+ std::optional<T> _unread;
+ pipe_reader(internal::pipe_buffer<T> *bufp) noexcept : _bufp(bufp) { }
friend class pipe<T>;
public:
/// \brief Read next item from the pipe
/// becomes non-empty, or the write side is closed. The value returned
/// is an optional<T>, which is disengaged to mark and end of file
/// (i.e., the write side was closed, and we've read everything it sent).
- future<compat::optional<T>> read() {
+ future<std::optional<T>> read() {
if (_unread) {
auto ret = std::move(*_unread);
_unread = {};
- return make_ready_future<compat::optional<T>>(std::move(ret));
+ return make_ready_future<std::optional<T>>(std::move(ret));
}
if (_bufp->readable()) {
return _bufp->read();
} else {
- return make_ready_future<compat::optional<T>>();
+ return make_ready_future<std::optional<T>>();
}
}
/// \brief Return an item to the front of the pipe
}
}
// Allow move, but not copy, of pipe_reader
- pipe_reader(pipe_reader&& other) : _bufp(other._bufp) {
+ pipe_reader(pipe_reader&& other) noexcept : _bufp(other._bufp) {
other._bufp = nullptr;
}
- pipe_reader& operator=(pipe_reader&& other) {
+ pipe_reader& operator=(pipe_reader&& other) noexcept {
std::swap(_bufp, other._bufp);
}
};
class pipe_writer {
private:
internal::pipe_buffer<T> *_bufp;
- pipe_writer(internal::pipe_buffer<T> *bufp) : _bufp(bufp) { }
+ pipe_writer(internal::pipe_buffer<T> *bufp) noexcept : _bufp(bufp) { }
friend class pipe<T>;
public:
/// \brief Write an item to the pipe
}
}
// Allow move, but not copy, of pipe_writer
- pipe_writer(pipe_writer&& other) : _bufp(other._bufp) {
+ pipe_writer(pipe_writer&& other) noexcept : _bufp(other._bufp) {
other._bufp = nullptr;
}
- pipe_writer& operator=(pipe_writer&& other) {
+ pipe_writer& operator=(pipe_writer&& other) noexcept {
std::swap(_bufp, other._bufp);
}
};
pipe_writer<T> writer;
explicit pipe(size_t size) : pipe(new internal::pipe_buffer<T>(size)) { }
private:
- pipe(internal::pipe_buffer<T> *bufp) : reader(bufp), writer(bufp) { }
+ pipe(internal::pipe_buffer<T> *bufp) noexcept : reader(bufp), writer(bufp) { }
};