]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /// Licensed to the Apache Software Foundation (ASF) under one |
2 | /// or more contributor license agreements. See the NOTICE file | |
3 | /// distributed with this work for additional information | |
4 | /// regarding copyright ownership. The ASF licenses this file | |
5 | /// to you under the Apache License, Version 2.0 (the | |
6 | /// "License"); you may not use this file except in compliance | |
7 | /// with the License. You may obtain a copy of the License at | |
8 | /// | |
9 | /// http://www.apache.org/licenses/LICENSE-2.0 | |
10 | /// | |
11 | /// Unless required by applicable law or agreed to in writing, | |
12 | /// software distributed under the License is distributed on an | |
13 | /// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | /// KIND, either express or implied. See the License for the | |
15 | /// specific language governing permissions and limitations | |
16 | /// under the License. | |
17 | ||
18 | part of thrift; | |
19 | ||
20 | /// Socket implementation of [TTransport]. | |
21 | /// | |
22 | /// For example: | |
23 | /// | |
24 | /// var transport = new TClientSocketTransport(new TWebSocket(url)); | |
25 | /// var protocol = new TBinaryProtocol(transport); | |
26 | /// var client = new MyThriftServiceClient(protocol); | |
27 | /// var result = client.myMethod(); | |
28 | /// | |
29 | /// Adapted from the JS WebSocket transport. | |
30 | abstract class TSocketTransport extends TBufferedTransport { | |
31 | final Logger logger = new Logger('thrift.TSocketTransport'); | |
32 | ||
33 | final TSocket socket; | |
34 | ||
35 | /// A transport using the provided [socket]. | |
36 | TSocketTransport(this.socket) { | |
37 | if (socket == null) { | |
38 | throw new ArgumentError.notNull('socket'); | |
39 | } | |
40 | ||
41 | socket.onError.listen((e) => logger.warning(e)); | |
42 | socket.onMessage.listen(handleIncomingMessage); | |
43 | } | |
44 | ||
45 | bool get isOpen => socket.isOpen; | |
46 | ||
47 | Future open() { | |
48 | _reset(isOpen: true); | |
49 | return socket.open(); | |
50 | } | |
51 | ||
52 | Future close() { | |
53 | _reset(isOpen: false); | |
54 | return socket.close(); | |
55 | } | |
56 | ||
57 | /// Make an incoming message available to read from the transport. | |
58 | void handleIncomingMessage(Uint8List messageBytes) { | |
59 | _setReadBuffer(messageBytes); | |
60 | } | |
61 | } | |
62 | ||
63 | /// [TClientSocketTransport] is a basic client socket transport. It sends | |
64 | /// outgoing messages and expects a response. | |
65 | /// | |
66 | /// NOTE: This transport expects a single threaded server, as it will process | |
67 | /// responses in FIFO order. | |
68 | class TClientSocketTransport extends TSocketTransport { | |
69 | final List<Completer<Uint8List>> _completers = []; | |
70 | ||
71 | TClientSocketTransport(TSocket socket) : super(socket); | |
72 | ||
73 | Future flush() { | |
74 | Uint8List bytes = consumeWriteBuffer(); | |
75 | ||
76 | // Use a sync completer to ensure that the buffer can be read immediately | |
77 | // after the read buffer is set, and avoid a race condition where another | |
78 | // response could overwrite the read buffer. | |
79 | var completer = new Completer<Uint8List>.sync(); | |
80 | _completers.add(completer); | |
81 | ||
82 | if (bytes.lengthInBytes > 0) { | |
83 | socket.send(bytes); | |
84 | } | |
85 | ||
86 | return completer.future; | |
87 | } | |
88 | ||
89 | void handleIncomingMessage(Uint8List messageBytes) { | |
90 | super.handleIncomingMessage(messageBytes); | |
91 | ||
92 | if (_completers.isNotEmpty) { | |
93 | var completer = _completers.removeAt(0); | |
94 | completer.complete(); | |
95 | } | |
96 | } | |
97 | } | |
98 | ||
99 | /// [TAsyncClientSocketTransport] sends outgoing messages and expects an | |
100 | /// asynchronous response. | |
101 | /// | |
102 | /// NOTE: This transport uses a [MessageReader] to read a [TMessage] when an | |
103 | /// incoming message arrives to correlate a response to a request, using the | |
104 | /// seqid. | |
105 | class TAsyncClientSocketTransport extends TSocketTransport { | |
106 | static const defaultTimeout = const Duration(seconds: 30); | |
107 | ||
108 | final Map<int, Completer<Uint8List>> _completers = {}; | |
109 | ||
110 | final TMessageReader messageReader; | |
111 | ||
112 | final Duration responseTimeout; | |
113 | ||
114 | TAsyncClientSocketTransport(TSocket socket, TMessageReader messageReader, | |
115 | {Duration responseTimeout: defaultTimeout}) | |
116 | : this.messageReader = messageReader, | |
117 | this.responseTimeout = responseTimeout, | |
118 | super(socket); | |
119 | ||
120 | Future flush() { | |
121 | Uint8List bytes = consumeWriteBuffer(); | |
122 | TMessage message = messageReader.readMessage(bytes); | |
123 | int seqid = message.seqid; | |
124 | ||
125 | // Use a sync completer to ensure that the buffer can be read immediately | |
126 | // after the read buffer is set, and avoid a race condition where another | |
127 | // response could overwrite the read buffer. | |
128 | var completer = new Completer<Uint8List>.sync(); | |
129 | _completers[seqid] = completer; | |
130 | ||
131 | if (responseTimeout != null) { | |
132 | new Future.delayed(responseTimeout, () { | |
133 | var completer = _completers.remove(seqid); | |
134 | if (completer != null) { | |
135 | completer.completeError( | |
136 | new TimeoutException("Response timed out.", responseTimeout)); | |
137 | } | |
138 | }); | |
139 | } | |
140 | ||
141 | socket.send(bytes); | |
142 | ||
143 | return completer.future; | |
144 | } | |
145 | ||
146 | void handleIncomingMessage(Uint8List messageBytes) { | |
147 | super.handleIncomingMessage(messageBytes); | |
148 | ||
149 | TMessage message = messageReader.readMessage(messageBytes); | |
150 | var completer = _completers.remove(message.seqid); | |
151 | if (completer != null) { | |
152 | completer.complete(); | |
153 | } | |
154 | } | |
155 | } | |
156 | ||
157 | /// [TServerSocketTransport] listens for incoming messages. When it sends a | |
158 | /// response, it does not expect an acknowledgement. | |
159 | class TServerSocketTransport extends TSocketTransport { | |
160 | final StreamController _onIncomingMessageController; | |
161 | Stream get onIncomingMessage => _onIncomingMessageController.stream; | |
162 | ||
163 | TServerSocketTransport(TSocket socket) | |
164 | : _onIncomingMessageController = new StreamController.broadcast(), | |
165 | super(socket); | |
166 | ||
167 | Future flush() async { | |
168 | Uint8List message = consumeWriteBuffer(); | |
169 | socket.send(message); | |
170 | } | |
171 | ||
172 | void handleIncomingMessage(Uint8List messageBytes) { | |
173 | super.handleIncomingMessage(messageBytes); | |
174 | ||
175 | _onIncomingMessageController.add(null); | |
176 | } | |
177 | } |