]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/thrift/lib/dart/test/transport/t_socket_transport_test.dart
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / dart / test / transport / t_socket_transport_test.dart
CommitLineData
f67539c2
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18library thrift.test.transport.t_socket_transport_test;
19
20import 'dart:async';
21import 'dart:convert' show Utf8Codec;
22import 'dart:typed_data' show Uint8List;
23
24import 'package:dart2_constant/convert.dart' show base64;
25import 'package:dart2_constant/core.dart' as core;
26import 'package:mockito/mockito.dart';
27import 'package:test/test.dart';
28import 'package:thrift/thrift.dart';
29
30void main() {
31 const utf8Codec = const Utf8Codec();
32
33 final requestText = 'my test request';
34 final requestBytes = new Uint8List.fromList(utf8Codec.encode(requestText));
35 final requestBase64 = base64.encode(requestBytes);
36
37 final responseText = 'response 1';
38 final responseBytes = new Uint8List.fromList(utf8Codec.encode(responseText));
39 final responseBase64 = base64.encode(responseBytes);
40
41 final framedResponseBase64 = base64.encode(_getFramedResponse(responseBytes));
42
43 group('TClientSocketTransport', () {
44 FakeSocket socket;
45 TTransport transport;
46
47 setUp(() async {
48 socket = new FakeSocket(sync: false);
49 await socket.open();
50 transport = new TClientSocketTransport(socket);
51 await transport.open();
52 transport.writeAll(requestBytes);
53 });
54
55 test('Test client sending data over transport', () async {
56 expect(socket.sendPayload, isNull);
57
58 Future responseReady = transport.flush();
59
60 // allow microtask events to finish
61 await new Future.value();
62
63 expect(socket.sendPayload, isNotNull);
64 expect(socket.sendPayload, requestBytes);
65
66 // simulate a response
67 socket.receiveFakeMessage(responseBase64);
68
69 await responseReady;
70 var buffer = new Uint8List(responseBytes.length);
71 transport.readAll(buffer, 0, responseBytes.length);
72 var bufferText = utf8Codec.decode(buffer);
73
74 expect(bufferText, responseText);
75 });
76 }, timeout: new Timeout(new Duration(seconds: 1)));
77
78 group('TClientSocketTransport with FramedTransport', () {
79 FakeSocket socket;
80 TTransport transport;
81
82 setUp(() async {
83 socket = new FakeSocket(sync: true);
84 await socket.open();
85
86 transport = new TFramedTransport(new TClientSocketTransport(socket));
87 await transport.open();
88 transport.writeAll(requestBytes);
89 });
90
91 test('Test client sending data over framed transport', () async {
92 String bufferText;
93
94 Future responseReady = transport.flush().then((_) {
95 var buffer = new Uint8List(responseBytes.length);
96 transport.readAll(buffer, 0, responseBytes.length);
97 bufferText = utf8Codec.decode(buffer);
98 });
99
100 // simulate a response
101 socket.receiveFakeMessage(framedResponseBase64);
102
103 await responseReady;
104 expect(bufferText, responseText);
105 });
106 }, timeout: new Timeout(new Duration(seconds: 1)));
107
108 group('TAsyncClientSocketTransport', () {
109 FakeSocket socket;
110 FakeProtocolFactory protocolFactory;
111 TTransport transport;
112
113 setUp(() async {
114 socket = new FakeSocket(sync: true);
115 await socket.open();
116
117 protocolFactory = new FakeProtocolFactory();
118 protocolFactory.message = new TMessage('foo', TMessageType.CALL, 123);
119 transport = new TAsyncClientSocketTransport(
120 socket, new TMessageReader(protocolFactory),
121 responseTimeout: core.Duration.zero);
122 await transport.open();
123 transport.writeAll(requestBytes);
124 });
125
126 test('Test response correlates to correct request', () async {
127 String bufferText;
128
129 Future responseReady = transport.flush().then((_) {
130 var buffer = new Uint8List(responseBytes.length);
131 transport.readAll(buffer, 0, responseBytes.length);
132 bufferText = utf8Codec.decode(buffer);
133 });
134
135 // simulate a response
136 protocolFactory.message = new TMessage('foo', TMessageType.REPLY, 123);
137 socket.receiveFakeMessage(responseBase64);
138
139 // simulate a second response
140 var response2Text = 'response 2';
141 var response2Bytes =
142 new Uint8List.fromList(utf8Codec.encode(response2Text));
143 var response2Base64 = base64.encode(response2Bytes);
144 protocolFactory.message = new TMessage('foo2', TMessageType.REPLY, 124);
145 socket.receiveFakeMessage(response2Base64);
146
147 await responseReady;
148 expect(bufferText, responseText);
149 });
150
151 test('Test response timeout', () async {
152 Future responseReady = transport.flush();
153 expect(responseReady, throwsA(new isInstanceOf<TimeoutException>()));
154 });
155 }, timeout: new Timeout(new Duration(seconds: 1)));
156
157 group('TAsyncClientSocketTransport with TFramedTransport', () {
158 FakeSocket socket;
159 FakeProtocolFactory protocolFactory;
160 TTransport transport;
161
162 setUp(() async {
163 socket = new FakeSocket(sync: true);
164 await socket.open();
165
166 protocolFactory = new FakeProtocolFactory();
167 protocolFactory.message = new TMessage('foo', TMessageType.CALL, 123);
168 var messageReader = new TMessageReader(protocolFactory,
169 byteOffset: TFramedTransport.headerByteCount);
170
171 transport = new TFramedTransport(new TAsyncClientSocketTransport(
172 socket, messageReader,
173 responseTimeout: core.Duration.zero));
174 await transport.open();
175 transport.writeAll(requestBytes);
176 });
177
178 test('Test async client sending data over framed transport', () async {
179 String bufferText;
180
181 Future responseReady = transport.flush().then((_) {
182 var buffer = new Uint8List(responseBytes.length);
183 transport.readAll(buffer, 0, responseBytes.length);
184 bufferText = utf8Codec.decode(buffer);
185 });
186
187 // simulate a response
188 protocolFactory.message = new TMessage('foo', TMessageType.REPLY, 123);
189 socket.receiveFakeMessage(framedResponseBase64);
190
191 await responseReady;
192 expect(bufferText, responseText);
193 });
194 }, timeout: new Timeout(new Duration(seconds: 1)));
195
196 group('TServerTransport', () {
197 test('Test server transport listens to socket', () async {
198 var socket = new FakeSocket();
199 await socket.open();
200 expect(socket.isOpen, isTrue);
201
202 var transport = new TServerSocketTransport(socket);
203 expect(transport.hasReadData, isFalse);
204
205 socket.receiveFakeMessage(requestBase64);
206
207 // allow microtask events to finish
208 await new Future.value();
209
210 expect(transport.hasReadData, isTrue);
211
212 var buffer = new Uint8List(requestBytes.length);
213 transport.readAll(buffer, 0, requestBytes.length);
214
215 var bufferText = utf8Codec.decode(buffer);
216 expect(bufferText, requestText);
217 });
218
219 test('Test server sending data over transport', () async {
220 var socket = new FakeSocket();
221 await socket.open();
222
223 var transport = new TServerSocketTransport(socket);
224
225 transport.writeAll(responseBytes);
226 expect(socket.sendPayload, isNull);
227
228 transport.flush();
229
230 // allow microtask events to finish
231 await new Future.value();
232
233 expect(socket.sendPayload, isNotNull);
234 expect(socket.sendPayload, responseBytes);
235 });
236 }, timeout: new Timeout(new Duration(seconds: 1)));
237}
238
239class FakeSocket extends TSocket {
240 final StreamController<TSocketState> _onStateController;
241 Stream<TSocketState> get onState => _onStateController.stream;
242
243 final StreamController<Object> _onErrorController;
244 Stream<Object> get onError => _onErrorController.stream;
245
246 final StreamController<Uint8List> _onMessageController;
247 Stream<Uint8List> get onMessage => _onMessageController.stream;
248
249 FakeSocket({bool sync: false})
250 : _onStateController = new StreamController.broadcast(sync: sync),
251 _onErrorController = new StreamController.broadcast(sync: sync),
252 _onMessageController = new StreamController.broadcast(sync: sync);
253
254 bool _isOpen;
255
256 bool get isOpen => _isOpen;
257
258 bool get isClosed => !isOpen;
259
260 Future open() async {
261 _isOpen = true;
262 _onStateController.add(TSocketState.OPEN);
263 }
264
265 Future close() async {
266 _isOpen = false;
267 _onStateController.add(TSocketState.CLOSED);
268 }
269
270 Uint8List _sendPayload;
271 Uint8List get sendPayload => _sendPayload;
272
273 void send(Uint8List data) {
274 if (!isOpen) throw new StateError('The socket is not open');
275
276 _sendPayload = data;
277 }
278
279 void receiveFakeMessage(String base64text) {
280 if (!isOpen) throw new StateError('The socket is not open');
281
282 var message = new Uint8List.fromList(base64.decode(base64text));
283 _onMessageController.add(message);
284 }
285}
286
287class FakeProtocolFactory implements TProtocolFactory {
288 FakeProtocolFactory();
289
290 TMessage message;
291
292 getProtocol(TTransport transport) => new FakeProtocol(message);
293}
294
295class FakeProtocol extends Mock implements TProtocol {
296 FakeProtocol(this._message);
297
298 TMessage _message;
299
300 readMessageBegin() => _message;
301}
302
303Uint8List _getFramedResponse(Uint8List responseBytes) {
304 var byteOffset = TFramedTransport.headerByteCount;
305 var response = new Uint8List(byteOffset + responseBytes.length);
306
307 response.buffer.asByteData().setInt32(0, responseBytes.length);
308 response.setAll(byteOffset, responseBytes);
309
310 return response;
311}