]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/doc/rpc-streaming.md
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / seastar / doc / rpc-streaming.md
1 # RPC streaming
2
3 ## Streaming API
4
5 ### Sink and Source
6
7 Basic element of streaming API is `rpc::sink` and `rpc::source`. The former
8 is used to send data and the later is to receive it. Client and server
9 has their own pair of sink and source. `rpc::sink` and `rpc::source` are
10 templated classes where template parameters describe a type of the data
11 that is sent/received. For instance the sink that is used to send messages
12 containing `int` and `long` will be of a type `rpc::sink<int, long>`. The
13 opposite end of the stream will have a source of the type `rpc::source<int, long>`
14 which will be used to receive those messages. Messages are received at a
15 source as `compat::optional` containing an actual message as an `std::tuple`. Unengaged
16 optional means EOS (end of stream) - the stream was closed by a peer. If
17 error happen before EOS is received a receiver cannot be sure it received all
18 the data.
19
20 To send the data using `rpc::source<int, long>` one can write (assuming `seastar::async` context):
21
22 ```c++
23 while (has_data()) {
24 int data1 = get_data1();
25 long data2 = get_data2();
26 sink(data1, data2).get(); // sends data
27 }
28 sink.close().get(); // closes stream
29 ```
30
31 To receive:
32
33 ```c++
34 while (true) {
35 seastar:optional<std::tuple<int, long>> data = source().get();
36 if (!data) {
37 // unengaged optional means EOS
38 break;
39 } else {
40 auto [data1, data2] = *data;
41 // process data
42 }
43 }
44 ```
45
46 ### Creating a stream
47
48 To open an RPC stream one needs RPC client to be created already. The stream
49 will be associated with the client and will be aborted if the client is closed
50 before streaming is. Given RPC client `rc` one creates `rpc::sink` like that
51 (again assuming `seastar::async` context):
52
53 ```c++
54 rpc::sink<int, long> sink = rc.make_stream_sink<int, long>().get0();
55 ```
56
57 Now the client has the sink that can be used for streaming data to
58 a server, but how the server will get a corresponding `rpc::source` to
59 read it? For that the sink should be passed to the server by an RPC
60 call. To receive a sink a server should register an RPC handler that will
61 be used to receive it along with any auxiliary information deemed necessary.
62 To receive the sink above one may register an RPC handler like that:
63
64 ```c++
65 rpc_proto.register_handler(1, [] (int aux_data, rpc::source<int, long> source) {
66 });
67 ```
68
69 Notice that `rpc::sink` is received as an `rpc::source` since at the server
70 side it will be used for receive. Now all is left to do is for the client to
71 invoke this RPC handler with aux_data and the sink.
72
73 But what about communicating in another direction: from a server to a
74 client. For that a server also has to have a sink and a client has to have
75 a source and since messages in this direction may be of a different type
76 than from client to server the sink and the source may be of a different
77 type as well.
78
79 Server initiates creation of a communication channel in another direction.
80 It does this by creating a sink from the source it receives and returning the sink
81 from RPC handler which will cause it to be received as a source by a client. Lets look
82 at the full example where server want to send message containing sstring to a client.
83
84 Server handler will look like that:
85
86 ```c++
87 rpc_proto.register_handler(1, [] (int aux_data, rpc::source<int, long> source) {
88 rpc::sink<sstring> sink = source.make_sink<sstring>();
89 // use sink and source asynchronously
90 return sink;
91 });
92 ```
93
94 Client code will be:
95
96 ```c++
97 auto rpc_call = rpc_proto.make_client<rpc::source<sstring> (int, rpc::sink<int>)>(1);
98 rpc::sink<int, long> sink = rc.make_stream_sink<int, long>().get0();
99 rpc::source<sstring> source = rpc_call(rc, aux_data, sink).get0();
100 // use sink and source here
101 ```
102
103 ## Implementation notes
104
105 ### RPC stream creation
106
107 RPC stream is implemented as a separate TCP connection. RPC server knows that a connection
108 will be used for streaming if during RPC negotiation `Stream parent` feature is present.
109 The feature will contain ID of an RPC client that was used to create the stream.
110
111 So in the example from previous chapter:
112
113 ```c++
114 rpc::sink<int, long> sink = rc.make_stream_sink<int, long>().get0();
115 ```
116
117 the call will initiate a new TCP connection to the same server `rc` is connected to. During RPC
118 protocol negotiation this connection will have `Stream parent` feature with `rc`'s ID as a value.
119
120 ### Passing sink/source over RPC call
121
122 When `rpc::sink` is sent over RPC call it is serialized as its connection ID. Server's RPC handler
123 then lookups the connection and creates an `rpc::source` from it. When RPC handler returns `rpc::sink`
124 the same happens in other direction.