]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/doc/rpc-streaming.md
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / seastar / doc / rpc-streaming.md
CommitLineData
11fdf7f2
TL
1# RPC streaming
2
3## Streaming API
4
5### Sink and Source
6
7Basic element of streaming API is `rpc::sink` and `rpc::source`. The former
8is used to send data and the later is to receive it. Client and server
9has their own pair of sink and source. `rpc::sink` and `rpc::source` are
10templated classes where template parameters describe a type of the data
11that is sent/received. For instance the sink that is used to send messages
12containing `int` and `long` will be of a type `rpc::sink<int, long>`. The
13opposite end of the stream will have a source of the type `rpc::source<int, long>`
14which will be used to receive those messages. Messages are received at a
f67539c2 15source as `std::optional` containing an actual message as an `std::tuple`. Unengaged
11fdf7f2
TL
16optional means EOS (end of stream) - the stream was closed by a peer. If
17error happen before EOS is received a receiver cannot be sure it received all
18the data.
19
20To send the data using `rpc::source<int, long>` one can write (assuming `seastar::async` context):
21
f67539c2 22```cpp
11fdf7f2
TL
23 while (has_data()) {
24 int data1 = get_data1();
25 long data2 = get_data2();
26 sink(data1, data2).get(); // sends data
27 }
28 sink.close().get(); // closes stream
29```
30
31To receive:
32
f67539c2 33```cpp
11fdf7f2 34 while (true) {
f67539c2 35 std:optional<std::tuple<int, long>> data = source().get0();
11fdf7f2
TL
36 if (!data) {
37 // unengaged optional means EOS
38 break;
39 } else {
40 auto [data1, data2] = *data;
41 // process data
42 }
43 }
44```
45
46### Creating a stream
47
48To open an RPC stream one needs RPC client to be created already. The stream
49will be associated with the client and will be aborted if the client is closed
f67539c2 50before streaming is. Given RPC client `rc`, and a `serializer` class that models the Serializer concept (as explained in the rpc::protocol class), one creates `rpc::sink` as follows
11fdf7f2
TL
51(again assuming `seastar::async` context):
52
f67539c2
TL
53```cpp
54 rpc::sink<int, long> sink = rc.make_stream_sink<serializer, int, long>().get0();
11fdf7f2
TL
55```
56
57Now the client has the sink that can be used for streaming data to
58a server, but how the server will get a corresponding `rpc::source` to
59read it? For that the sink should be passed to the server by an RPC
60call. To receive a sink a server should register an RPC handler that will
61be used to receive it along with any auxiliary information deemed necessary.
62To receive the sink above one may register an RPC handler like that:
63
f67539c2 64```cpp
11fdf7f2
TL
65 rpc_proto.register_handler(1, [] (int aux_data, rpc::source<int, long> source) {
66 });
67```
68
69Notice that `rpc::sink` is received as an `rpc::source` since at the server
70side it will be used for receive. Now all is left to do is for the client to
71invoke this RPC handler with aux_data and the sink.
72
73But what about communicating in another direction: from a server to a
74client. For that a server also has to have a sink and a client has to have
75a source and since messages in this direction may be of a different type
76than from client to server the sink and the source may be of a different
77type as well.
78
79Server initiates creation of a communication channel in another direction.
80It does this by creating a sink from the source it receives and returning the sink
81from RPC handler which will cause it to be received as a source by a client. Lets look
82at the full example where server want to send message containing sstring to a client.
83
84Server handler will look like that:
85
f67539c2 86```cpp
11fdf7f2 87 rpc_proto.register_handler(1, [] (int aux_data, rpc::source<int, long> source) {
f67539c2 88 rpc::sink<sstring> sink = source.make_sink<serializer, sstring>();
11fdf7f2
TL
89 // use sink and source asynchronously
90 return sink;
91 });
92```
93
94Client code will be:
95
f67539c2 96```cpp
11fdf7f2 97 auto rpc_call = rpc_proto.make_client<rpc::source<sstring> (int, rpc::sink<int>)>(1);
f67539c2 98 rpc::sink<int, long> sink = rc.make_stream_sink<serializer, int, long>().get0();
11fdf7f2
TL
99 rpc::source<sstring> source = rpc_call(rc, aux_data, sink).get0();
100 // use sink and source here
101```
102
103## Implementation notes
104
105### RPC stream creation
106
107RPC stream is implemented as a separate TCP connection. RPC server knows that a connection
108will be used for streaming if during RPC negotiation `Stream parent` feature is present.
109The feature will contain ID of an RPC client that was used to create the stream.
110
111So in the example from previous chapter:
112
f67539c2
TL
113```cpp
114 rpc::sink<int, long> sink = rc.make_stream_sink<serializer, int, long>().get0();
11fdf7f2
TL
115```
116
117the call will initiate a new TCP connection to the same server `rc` is connected to. During RPC
118protocol negotiation this connection will have `Stream parent` feature with `rc`'s ID as a value.
119
120### Passing sink/source over RPC call
121
122When `rpc::sink` is sent over RPC call it is serialized as its connection ID. Server's RPC handler
123then lookups the connection and creates an `rpc::source` from it. When RPC handler returns `rpc::sink`
124the same happens in other direction.