]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/dpdk/stream.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / msg / async / dpdk / stream.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 /*
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
7 *
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 /*
20 * Copyright (C) 2014 Cloudius Systems, Ltd.
21 */
22
23 #ifndef CEPH_MSG_STREAM_H_
24 #define CEPH_MSG_STREAM_H_
25
26 #include <exception>
27 #include <cassert>
28
29 // A stream<> is the producer side. It may call produce() as long
30 // as the returned from the previous invocation is ready.
31 // To signify no more data is available, call close().
32 //
33 // A subscription<> is the consumer side. It is created by a call
34 // to stream::listen(). Calling subscription::start(),
35 // which registers the data processing callback, starts processing
36 // events. It may register for end-of-stream notifications by
37 // return the when_done() future, which also delivers error
38 // events (as exceptions).
39 //
40 // The consumer can pause generation of new data by returning
41 // positive integer; when it becomes ready, the producer
42 // will resume processing.
43
44 template <typename... T>
45 class subscription;
46
47 template <typename... T>
48 class stream {
49 subscription<T...>* _sub = nullptr;
50 int done;
51 bool ready;
52 public:
53 using next_fn = std::function<int (T...)>;
54 stream() = default;
55 stream(const stream&) = delete;
56 stream(stream&&) = delete;
57 ~stream() {
58 if (_sub) {
59 _sub->_stream = nullptr;
60 }
61 }
62
63 void operator=(const stream&) = delete;
64 void operator=(stream&&) = delete;
65
66 // Returns a subscription that reads value from this
67 // stream.
68 subscription<T...> listen() {
69 return subscription<T...>(this);
70 }
71
72 // Returns a subscription that reads value from this
73 // stream, and also sets up the listen function.
74 subscription<T...> listen(next_fn next) {
75 auto sub = subscription<T...>(this);
76 sub.start(std::move(next));
77 return sub;
78 }
79
80 // Becomes ready when the listener is ready to accept
81 // values. Call only once, when beginning to produce
82 // values.
83 bool started() {
84 return ready;
85 }
86
87 // Produce a value. Call only after started(), and after
88 // a previous produce() is ready.
89 int produce(T... data) {
90 return _sub->_next(std::move(data)...);
91 }
92
93 // End the stream. Call only after started(), and after
94 // a previous produce() is ready. No functions may be called
95 // after this.
96 void close() {
97 done = 1;
98 }
99
100 // Signal an error. Call only after started(), and after
101 // a previous produce() is ready. No functions may be called
102 // after this.
103 void set_exception(int error) {
104 done = error;
105 }
106 private:
107 void start();
108 friend class subscription<T...>;
109 };
110
111 template <typename... T>
112 class subscription {
113 public:
114 using next_fn = typename stream<T...>::next_fn;
115 private:
116 stream<T...>* _stream;
117 next_fn _next;
118 private:
119 explicit subscription(stream<T...>* s): _stream(s) {
120 ceph_assert(!_stream->_sub);
121 _stream->_sub = this;
122 }
123
124 public:
125 subscription(subscription&& x)
126 : _stream(x._stream), _next(std::move(x._next)) {
127 x._stream = nullptr;
128 if (_stream) {
129 _stream->_sub = this;
130 }
131 }
132 ~subscription() {
133 if (_stream) {
134 _stream->_sub = nullptr;
135 }
136 }
137
138 /// \brief Start receiving events from the stream.
139 ///
140 /// \param next Callback to call for each event
141 void start(std::function<int (T...)> next) {
142 _next = std::move(next);
143 _stream->ready = true;
144 }
145
146 // Becomes ready when the stream is empty, or when an error
147 // happens (in that case, an exception is held).
148 int done() {
149 return _stream->done;
150 }
151
152 friend class stream<T...>;
153 };
154
155 #endif /* CEPH_MSG_STREAM_H_ */