]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | # RPC streaming |
2 | ||
3 | ## Streaming API | |
4 | ||
5 | ### Sink and Source | |
6 | ||
7 | Basic element of streaming API is `rpc::sink` and `rpc::source`. The former | |
8 | is used to send data and the later is to receive it. Client and server | |
9 | has their own pair of sink and source. `rpc::sink` and `rpc::source` are | |
10 | templated classes where template parameters describe a type of the data | |
11 | that is sent/received. For instance the sink that is used to send messages | |
12 | containing `int` and `long` will be of a type `rpc::sink<int, long>`. The | |
13 | opposite end of the stream will have a source of the type `rpc::source<int, long>` | |
14 | which will be used to receive those messages. Messages are received at a | |
f67539c2 | 15 | source as `std::optional` containing an actual message as an `std::tuple`. Unengaged |
11fdf7f2 TL |
16 | optional means EOS (end of stream) - the stream was closed by a peer. If |
17 | error happen before EOS is received a receiver cannot be sure it received all | |
18 | the data. | |
19 | ||
20 | To send the data using `rpc::source<int, long>` one can write (assuming `seastar::async` context): | |
21 | ||
f67539c2 | 22 | ```cpp |
11fdf7f2 TL |
23 | while (has_data()) { |
24 | int data1 = get_data1(); | |
25 | long data2 = get_data2(); | |
26 | sink(data1, data2).get(); // sends data | |
27 | } | |
28 | sink.close().get(); // closes stream | |
29 | ``` | |
30 | ||
31 | To receive: | |
32 | ||
f67539c2 | 33 | ```cpp |
11fdf7f2 | 34 | while (true) { |
f67539c2 | 35 | std:optional<std::tuple<int, long>> data = source().get0(); |
11fdf7f2 TL |
36 | if (!data) { |
37 | // unengaged optional means EOS | |
38 | break; | |
39 | } else { | |
40 | auto [data1, data2] = *data; | |
41 | // process data | |
42 | } | |
43 | } | |
44 | ``` | |
45 | ||
46 | ### Creating a stream | |
47 | ||
48 | To open an RPC stream one needs RPC client to be created already. The stream | |
49 | will be associated with the client and will be aborted if the client is closed | |
f67539c2 | 50 | before streaming is. Given RPC client `rc`, and a `serializer` class that models the Serializer concept (as explained in the rpc::protocol class), one creates `rpc::sink` as follows |
11fdf7f2 TL |
51 | (again assuming `seastar::async` context): |
52 | ||
f67539c2 TL |
53 | ```cpp |
54 | rpc::sink<int, long> sink = rc.make_stream_sink<serializer, int, long>().get0(); | |
11fdf7f2 TL |
55 | ``` |
56 | ||
57 | Now the client has the sink that can be used for streaming data to | |
58 | a server, but how the server will get a corresponding `rpc::source` to | |
59 | read it? For that the sink should be passed to the server by an RPC | |
60 | call. To receive a sink a server should register an RPC handler that will | |
61 | be used to receive it along with any auxiliary information deemed necessary. | |
62 | To receive the sink above one may register an RPC handler like that: | |
63 | ||
f67539c2 | 64 | ```cpp |
11fdf7f2 TL |
65 | rpc_proto.register_handler(1, [] (int aux_data, rpc::source<int, long> source) { |
66 | }); | |
67 | ``` | |
68 | ||
69 | Notice that `rpc::sink` is received as an `rpc::source` since at the server | |
70 | side it will be used for receive. Now all is left to do is for the client to | |
71 | invoke this RPC handler with aux_data and the sink. | |
72 | ||
73 | But what about communicating in another direction: from a server to a | |
74 | client. For that a server also has to have a sink and a client has to have | |
75 | a source and since messages in this direction may be of a different type | |
76 | than from client to server the sink and the source may be of a different | |
77 | type as well. | |
78 | ||
79 | Server initiates creation of a communication channel in another direction. | |
80 | It does this by creating a sink from the source it receives and returning the sink | |
81 | from RPC handler which will cause it to be received as a source by a client. Lets look | |
82 | at the full example where server want to send message containing sstring to a client. | |
83 | ||
84 | Server handler will look like that: | |
85 | ||
f67539c2 | 86 | ```cpp |
11fdf7f2 | 87 | rpc_proto.register_handler(1, [] (int aux_data, rpc::source<int, long> source) { |
f67539c2 | 88 | rpc::sink<sstring> sink = source.make_sink<serializer, sstring>(); |
11fdf7f2 TL |
89 | // use sink and source asynchronously |
90 | return sink; | |
91 | }); | |
92 | ``` | |
93 | ||
94 | Client code will be: | |
95 | ||
f67539c2 | 96 | ```cpp |
11fdf7f2 | 97 | auto rpc_call = rpc_proto.make_client<rpc::source<sstring> (int, rpc::sink<int>)>(1); |
f67539c2 | 98 | rpc::sink<int, long> sink = rc.make_stream_sink<serializer, int, long>().get0(); |
11fdf7f2 TL |
99 | rpc::source<sstring> source = rpc_call(rc, aux_data, sink).get0(); |
100 | // use sink and source here | |
101 | ``` | |
102 | ||
103 | ## Implementation notes | |
104 | ||
105 | ### RPC stream creation | |
106 | ||
107 | RPC stream is implemented as a separate TCP connection. RPC server knows that a connection | |
108 | will be used for streaming if during RPC negotiation `Stream parent` feature is present. | |
109 | The feature will contain ID of an RPC client that was used to create the stream. | |
110 | ||
111 | So in the example from previous chapter: | |
112 | ||
f67539c2 TL |
113 | ```cpp |
114 | rpc::sink<int, long> sink = rc.make_stream_sink<serializer, int, long>().get0(); | |
11fdf7f2 TL |
115 | ``` |
116 | ||
117 | the call will initiate a new TCP connection to the same server `rc` is connected to. During RPC | |
118 | protocol negotiation this connection will have `Stream parent` feature with `rc`'s ID as a value. | |
119 | ||
120 | ### Passing sink/source over RPC call | |
121 | ||
122 | When `rpc::sink` is sent over RPC call it is serialized as its connection ID. Server's RPC handler | |
123 | then lookups the connection and creates an `rpc::source` from it. When RPC handler returns `rpc::sink` | |
124 | the same happens in other direction. |