]>
git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/dpdk/stream.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 * Copyright (C) 2014 Cloudius Systems, Ltd.
23 #ifndef CEPH_MSG_STREAM_H_
24 #define CEPH_MSG_STREAM_H_
29 // A stream<> is the producer side. It may call produce() as long
30 // as the returned from the previous invocation is ready.
31 // To signify no more data is available, call close().
33 // A subscription<> is the consumer side. It is created by a call
34 // to stream::listen(). Calling subscription::start(),
35 // which registers the data processing callback, starts processing
36 // events. It may register for end-of-stream notifications by
37 // return the when_done() future, which also delivers error
38 // events (as exceptions).
40 // The consumer can pause generation of new data by returning
41 // positive integer; when it becomes ready, the producer
42 // will resume processing.
44 template <typename
... T
>
47 template <typename
... T
>
49 subscription
<T
...>* _sub
= nullptr;
53 using next_fn
= std::function
<int (T
...)>;
55 stream(const stream
&) = delete;
56 stream(stream
&&) = delete;
59 _sub
->_stream
= nullptr;
63 void operator=(const stream
&) = delete;
64 void operator=(stream
&&) = delete;
66 // Returns a subscription that reads value from this
68 subscription
<T
...> listen() {
69 return subscription
<T
...>(this);
72 // Returns a subscription that reads value from this
73 // stream, and also sets up the listen function.
74 subscription
<T
...> listen(next_fn next
) {
75 auto sub
= subscription
<T
...>(this);
76 sub
.start(std::move(next
));
80 // Becomes ready when the listener is ready to accept
81 // values. Call only once, when beginning to produce
87 // Produce a value. Call only after started(), and after
88 // a previous produce() is ready.
89 int produce(T
... data
) {
90 return _sub
->_next(std::move(data
)...);
93 // End the stream. Call only after started(), and after
94 // a previous produce() is ready. No functions may be called
100 // Signal an error. Call only after started(), and after
101 // a previous produce() is ready. No functions may be called
103 void set_exception(int error
) {
108 friend class subscription
<T
...>;
111 template <typename
... T
>
114 using next_fn
= typename stream
<T
...>::next_fn
;
116 stream
<T
...>* _stream
;
119 explicit subscription(stream
<T
...>* s
): _stream(s
) {
120 ceph_assert(!_stream
->_sub
);
121 _stream
->_sub
= this;
125 subscription(subscription
&& x
)
126 : _stream(x
._stream
), _next(std::move(x
._next
)) {
129 _stream
->_sub
= this;
134 _stream
->_sub
= nullptr;
138 /// \brief Start receiving events from the stream.
140 /// \param next Callback to call for each event
141 void start(std::function
<int (T
...)> next
) {
142 _next
= std::move(next
);
143 _stream
->ready
= true;
146 // Becomes ready when the stream is empty, or when an error
147 // happens (in that case, an exception is held).
149 return _stream
->done
;
152 friend class stream
<T
...>;
155 #endif /* CEPH_MSG_STREAM_H_ */