]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | library thrift.test.transport.t_socket_transport_test; | |
19 | ||
20 | import 'dart:async'; | |
21 | import 'dart:convert' show Utf8Codec; | |
22 | import 'dart:typed_data' show Uint8List; | |
23 | ||
24 | import 'package:dart2_constant/convert.dart' show base64; | |
25 | import 'package:dart2_constant/core.dart' as core; | |
26 | import 'package:mockito/mockito.dart'; | |
27 | import 'package:test/test.dart'; | |
28 | import 'package:thrift/thrift.dart'; | |
29 | ||
30 | void main() { | |
31 | const utf8Codec = const Utf8Codec(); | |
32 | ||
33 | final requestText = 'my test request'; | |
34 | final requestBytes = new Uint8List.fromList(utf8Codec.encode(requestText)); | |
35 | final requestBase64 = base64.encode(requestBytes); | |
36 | ||
37 | final responseText = 'response 1'; | |
38 | final responseBytes = new Uint8List.fromList(utf8Codec.encode(responseText)); | |
39 | final responseBase64 = base64.encode(responseBytes); | |
40 | ||
41 | final framedResponseBase64 = base64.encode(_getFramedResponse(responseBytes)); | |
42 | ||
43 | group('TClientSocketTransport', () { | |
44 | FakeSocket socket; | |
45 | TTransport transport; | |
46 | ||
47 | setUp(() async { | |
48 | socket = new FakeSocket(sync: false); | |
49 | await socket.open(); | |
50 | transport = new TClientSocketTransport(socket); | |
51 | await transport.open(); | |
52 | transport.writeAll(requestBytes); | |
53 | }); | |
54 | ||
55 | test('Test client sending data over transport', () async { | |
56 | expect(socket.sendPayload, isNull); | |
57 | ||
58 | Future responseReady = transport.flush(); | |
59 | ||
60 | // allow microtask events to finish | |
61 | await new Future.value(); | |
62 | ||
63 | expect(socket.sendPayload, isNotNull); | |
64 | expect(socket.sendPayload, requestBytes); | |
65 | ||
66 | // simulate a response | |
67 | socket.receiveFakeMessage(responseBase64); | |
68 | ||
69 | await responseReady; | |
70 | var buffer = new Uint8List(responseBytes.length); | |
71 | transport.readAll(buffer, 0, responseBytes.length); | |
72 | var bufferText = utf8Codec.decode(buffer); | |
73 | ||
74 | expect(bufferText, responseText); | |
75 | }); | |
76 | }, timeout: new Timeout(new Duration(seconds: 1))); | |
77 | ||
78 | group('TClientSocketTransport with FramedTransport', () { | |
79 | FakeSocket socket; | |
80 | TTransport transport; | |
81 | ||
82 | setUp(() async { | |
83 | socket = new FakeSocket(sync: true); | |
84 | await socket.open(); | |
85 | ||
86 | transport = new TFramedTransport(new TClientSocketTransport(socket)); | |
87 | await transport.open(); | |
88 | transport.writeAll(requestBytes); | |
89 | }); | |
90 | ||
91 | test('Test client sending data over framed transport', () async { | |
92 | String bufferText; | |
93 | ||
94 | Future responseReady = transport.flush().then((_) { | |
95 | var buffer = new Uint8List(responseBytes.length); | |
96 | transport.readAll(buffer, 0, responseBytes.length); | |
97 | bufferText = utf8Codec.decode(buffer); | |
98 | }); | |
99 | ||
100 | // simulate a response | |
101 | socket.receiveFakeMessage(framedResponseBase64); | |
102 | ||
103 | await responseReady; | |
104 | expect(bufferText, responseText); | |
105 | }); | |
106 | }, timeout: new Timeout(new Duration(seconds: 1))); | |
107 | ||
108 | group('TAsyncClientSocketTransport', () { | |
109 | FakeSocket socket; | |
110 | FakeProtocolFactory protocolFactory; | |
111 | TTransport transport; | |
112 | ||
113 | setUp(() async { | |
114 | socket = new FakeSocket(sync: true); | |
115 | await socket.open(); | |
116 | ||
117 | protocolFactory = new FakeProtocolFactory(); | |
118 | protocolFactory.message = new TMessage('foo', TMessageType.CALL, 123); | |
119 | transport = new TAsyncClientSocketTransport( | |
120 | socket, new TMessageReader(protocolFactory), | |
121 | responseTimeout: core.Duration.zero); | |
122 | await transport.open(); | |
123 | transport.writeAll(requestBytes); | |
124 | }); | |
125 | ||
126 | test('Test response correlates to correct request', () async { | |
127 | String bufferText; | |
128 | ||
129 | Future responseReady = transport.flush().then((_) { | |
130 | var buffer = new Uint8List(responseBytes.length); | |
131 | transport.readAll(buffer, 0, responseBytes.length); | |
132 | bufferText = utf8Codec.decode(buffer); | |
133 | }); | |
134 | ||
135 | // simulate a response | |
136 | protocolFactory.message = new TMessage('foo', TMessageType.REPLY, 123); | |
137 | socket.receiveFakeMessage(responseBase64); | |
138 | ||
139 | // simulate a second response | |
140 | var response2Text = 'response 2'; | |
141 | var response2Bytes = | |
142 | new Uint8List.fromList(utf8Codec.encode(response2Text)); | |
143 | var response2Base64 = base64.encode(response2Bytes); | |
144 | protocolFactory.message = new TMessage('foo2', TMessageType.REPLY, 124); | |
145 | socket.receiveFakeMessage(response2Base64); | |
146 | ||
147 | await responseReady; | |
148 | expect(bufferText, responseText); | |
149 | }); | |
150 | ||
151 | test('Test response timeout', () async { | |
152 | Future responseReady = transport.flush(); | |
153 | expect(responseReady, throwsA(new isInstanceOf<TimeoutException>())); | |
154 | }); | |
155 | }, timeout: new Timeout(new Duration(seconds: 1))); | |
156 | ||
157 | group('TAsyncClientSocketTransport with TFramedTransport', () { | |
158 | FakeSocket socket; | |
159 | FakeProtocolFactory protocolFactory; | |
160 | TTransport transport; | |
161 | ||
162 | setUp(() async { | |
163 | socket = new FakeSocket(sync: true); | |
164 | await socket.open(); | |
165 | ||
166 | protocolFactory = new FakeProtocolFactory(); | |
167 | protocolFactory.message = new TMessage('foo', TMessageType.CALL, 123); | |
168 | var messageReader = new TMessageReader(protocolFactory, | |
169 | byteOffset: TFramedTransport.headerByteCount); | |
170 | ||
171 | transport = new TFramedTransport(new TAsyncClientSocketTransport( | |
172 | socket, messageReader, | |
173 | responseTimeout: core.Duration.zero)); | |
174 | await transport.open(); | |
175 | transport.writeAll(requestBytes); | |
176 | }); | |
177 | ||
178 | test('Test async client sending data over framed transport', () async { | |
179 | String bufferText; | |
180 | ||
181 | Future responseReady = transport.flush().then((_) { | |
182 | var buffer = new Uint8List(responseBytes.length); | |
183 | transport.readAll(buffer, 0, responseBytes.length); | |
184 | bufferText = utf8Codec.decode(buffer); | |
185 | }); | |
186 | ||
187 | // simulate a response | |
188 | protocolFactory.message = new TMessage('foo', TMessageType.REPLY, 123); | |
189 | socket.receiveFakeMessage(framedResponseBase64); | |
190 | ||
191 | await responseReady; | |
192 | expect(bufferText, responseText); | |
193 | }); | |
194 | }, timeout: new Timeout(new Duration(seconds: 1))); | |
195 | ||
196 | group('TServerTransport', () { | |
197 | test('Test server transport listens to socket', () async { | |
198 | var socket = new FakeSocket(); | |
199 | await socket.open(); | |
200 | expect(socket.isOpen, isTrue); | |
201 | ||
202 | var transport = new TServerSocketTransport(socket); | |
203 | expect(transport.hasReadData, isFalse); | |
204 | ||
205 | socket.receiveFakeMessage(requestBase64); | |
206 | ||
207 | // allow microtask events to finish | |
208 | await new Future.value(); | |
209 | ||
210 | expect(transport.hasReadData, isTrue); | |
211 | ||
212 | var buffer = new Uint8List(requestBytes.length); | |
213 | transport.readAll(buffer, 0, requestBytes.length); | |
214 | ||
215 | var bufferText = utf8Codec.decode(buffer); | |
216 | expect(bufferText, requestText); | |
217 | }); | |
218 | ||
219 | test('Test server sending data over transport', () async { | |
220 | var socket = new FakeSocket(); | |
221 | await socket.open(); | |
222 | ||
223 | var transport = new TServerSocketTransport(socket); | |
224 | ||
225 | transport.writeAll(responseBytes); | |
226 | expect(socket.sendPayload, isNull); | |
227 | ||
228 | transport.flush(); | |
229 | ||
230 | // allow microtask events to finish | |
231 | await new Future.value(); | |
232 | ||
233 | expect(socket.sendPayload, isNotNull); | |
234 | expect(socket.sendPayload, responseBytes); | |
235 | }); | |
236 | }, timeout: new Timeout(new Duration(seconds: 1))); | |
237 | } | |
238 | ||
239 | class FakeSocket extends TSocket { | |
240 | final StreamController<TSocketState> _onStateController; | |
241 | Stream<TSocketState> get onState => _onStateController.stream; | |
242 | ||
243 | final StreamController<Object> _onErrorController; | |
244 | Stream<Object> get onError => _onErrorController.stream; | |
245 | ||
246 | final StreamController<Uint8List> _onMessageController; | |
247 | Stream<Uint8List> get onMessage => _onMessageController.stream; | |
248 | ||
249 | FakeSocket({bool sync: false}) | |
250 | : _onStateController = new StreamController.broadcast(sync: sync), | |
251 | _onErrorController = new StreamController.broadcast(sync: sync), | |
252 | _onMessageController = new StreamController.broadcast(sync: sync); | |
253 | ||
254 | bool _isOpen; | |
255 | ||
256 | bool get isOpen => _isOpen; | |
257 | ||
258 | bool get isClosed => !isOpen; | |
259 | ||
260 | Future open() async { | |
261 | _isOpen = true; | |
262 | _onStateController.add(TSocketState.OPEN); | |
263 | } | |
264 | ||
265 | Future close() async { | |
266 | _isOpen = false; | |
267 | _onStateController.add(TSocketState.CLOSED); | |
268 | } | |
269 | ||
270 | Uint8List _sendPayload; | |
271 | Uint8List get sendPayload => _sendPayload; | |
272 | ||
273 | void send(Uint8List data) { | |
274 | if (!isOpen) throw new StateError('The socket is not open'); | |
275 | ||
276 | _sendPayload = data; | |
277 | } | |
278 | ||
279 | void receiveFakeMessage(String base64text) { | |
280 | if (!isOpen) throw new StateError('The socket is not open'); | |
281 | ||
282 | var message = new Uint8List.fromList(base64.decode(base64text)); | |
283 | _onMessageController.add(message); | |
284 | } | |
285 | } | |
286 | ||
287 | class FakeProtocolFactory implements TProtocolFactory { | |
288 | FakeProtocolFactory(); | |
289 | ||
290 | TMessage message; | |
291 | ||
292 | getProtocol(TTransport transport) => new FakeProtocol(message); | |
293 | } | |
294 | ||
295 | class FakeProtocol extends Mock implements TProtocol { | |
296 | FakeProtocol(this._message); | |
297 | ||
298 | TMessage _message; | |
299 | ||
300 | readMessageBegin() => _message; | |
301 | } | |
302 | ||
303 | Uint8List _getFramedResponse(Uint8List responseBytes) { | |
304 | var byteOffset = TFramedTransport.headerByteCount; | |
305 | var response = new Uint8List(byteOffset + responseBytes.length); | |
306 | ||
307 | response.buffer.asByteData().setInt32(0, responseBytes.length); | |
308 | response.setAll(byteOffset, responseBytes); | |
309 | ||
310 | return response; | |
311 | } |