]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/dpdk/stream.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / msg / async / dpdk / stream.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2/*
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
7 *
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19/*
20 * Copyright (C) 2014 Cloudius Systems, Ltd.
21 */
7c673cae
FG
22
23#ifndef CEPH_MSG_STREAM_H_
24#define CEPH_MSG_STREAM_H_
25
26#include <exception>
27#include <cassert>
28
29// A stream<> is the producer side. It may call produce() as long
30// as the returned from the previous invocation is ready.
31// To signify no more data is available, call close().
32//
33// A subscription<> is the consumer side. It is created by a call
34// to stream::listen(). Calling subscription::start(),
35// which registers the data processing callback, starts processing
36// events. It may register for end-of-stream notifications by
37// return the when_done() future, which also delivers error
38// events (as exceptions).
39//
40// The consumer can pause generation of new data by returning
41// positive integer; when it becomes ready, the producer
42// will resume processing.
43
44template <typename... T>
45class subscription;
46
47template <typename... T>
48class stream {
49 subscription<T...>* _sub = nullptr;
50 int done;
51 bool ready;
52 public:
53 using next_fn = std::function<int (T...)>;
54 stream() = default;
55 stream(const stream&) = delete;
56 stream(stream&&) = delete;
57 ~stream() {
58 if (_sub) {
59 _sub->_stream = nullptr;
60 }
61 }
62
63 void operator=(const stream&) = delete;
64 void operator=(stream&&) = delete;
65
66 // Returns a subscription that reads value from this
67 // stream.
68 subscription<T...> listen() {
69 return subscription<T...>(this);
70 }
71
72 // Returns a subscription that reads value from this
73 // stream, and also sets up the listen function.
74 subscription<T...> listen(next_fn next) {
75 auto sub = subscription<T...>(this);
76 sub.start(std::move(next));
77 return sub;
78 }
79
80 // Becomes ready when the listener is ready to accept
81 // values. Call only once, when beginning to produce
82 // values.
83 bool started() {
84 return ready;
85 }
86
87 // Produce a value. Call only after started(), and after
88 // a previous produce() is ready.
89 int produce(T... data) {
90 return _sub->_next(std::move(data)...);
91 }
92
93 // End the stream. Call only after started(), and after
94 // a previous produce() is ready. No functions may be called
95 // after this.
96 void close() {
97 done = 1;
98 }
99
100 // Signal an error. Call only after started(), and after
101 // a previous produce() is ready. No functions may be called
102 // after this.
103 void set_exception(int error) {
104 done = error;
105 }
106 private:
107 void start();
108 friend class subscription<T...>;
109};
110
111template <typename... T>
112class subscription {
113 public:
114 using next_fn = typename stream<T...>::next_fn;
115 private:
116 stream<T...>* _stream;
117 next_fn _next;
118 private:
119 explicit subscription(stream<T...>* s): _stream(s) {
11fdf7f2 120 ceph_assert(!_stream->_sub);
7c673cae
FG
121 _stream->_sub = this;
122 }
123
124 public:
125 subscription(subscription&& x)
126 : _stream(x._stream), _next(std::move(x._next)) {
127 x._stream = nullptr;
128 if (_stream) {
129 _stream->_sub = this;
130 }
131 }
132 ~subscription() {
133 if (_stream) {
134 _stream->_sub = nullptr;
135 }
136 }
137
138 /// \brief Start receiving events from the stream.
139 ///
140 /// \param next Callback to call for each event
141 void start(std::function<int (T...)> next) {
142 _next = std::move(next);
143 _stream->ready = true;
144 }
145
146 // Becomes ready when the stream is empty, or when an error
147 // happens (in that case, an exception is held).
148 int done() {
149 return _stream->done;
150 }
151
152 friend class stream<T...>;
153};
154
155#endif /* CEPH_MSG_STREAM_H_ */