]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/thrift/lib/dart/lib/src/transport/t_socket_transport.dart
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / dart / lib / src / transport / t_socket_transport.dart
CommitLineData
f67539c2
TL
1/// Licensed to the Apache Software Foundation (ASF) under one
2/// or more contributor license agreements. See the NOTICE file
3/// distributed with this work for additional information
4/// regarding copyright ownership. The ASF licenses this file
5/// to you under the Apache License, Version 2.0 (the
6/// "License"); you may not use this file except in compliance
7/// with the License. You may obtain a copy of the License at
8///
9/// http://www.apache.org/licenses/LICENSE-2.0
10///
11/// Unless required by applicable law or agreed to in writing,
12/// software distributed under the License is distributed on an
13/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14/// KIND, either express or implied. See the License for the
15/// specific language governing permissions and limitations
16/// under the License.
17
18part of thrift;
19
20/// Socket implementation of [TTransport].
21///
22/// For example:
23///
24/// var transport = new TClientSocketTransport(new TWebSocket(url));
25/// var protocol = new TBinaryProtocol(transport);
26/// var client = new MyThriftServiceClient(protocol);
27/// var result = client.myMethod();
28///
29/// Adapted from the JS WebSocket transport.
30abstract class TSocketTransport extends TBufferedTransport {
31 final Logger logger = new Logger('thrift.TSocketTransport');
32
33 final TSocket socket;
34
35 /// A transport using the provided [socket].
36 TSocketTransport(this.socket) {
37 if (socket == null) {
38 throw new ArgumentError.notNull('socket');
39 }
40
41 socket.onError.listen((e) => logger.warning(e));
42 socket.onMessage.listen(handleIncomingMessage);
43 }
44
45 bool get isOpen => socket.isOpen;
46
47 Future open() {
48 _reset(isOpen: true);
49 return socket.open();
50 }
51
52 Future close() {
53 _reset(isOpen: false);
54 return socket.close();
55 }
56
57 /// Make an incoming message available to read from the transport.
58 void handleIncomingMessage(Uint8List messageBytes) {
59 _setReadBuffer(messageBytes);
60 }
61}
62
63/// [TClientSocketTransport] is a basic client socket transport. It sends
64/// outgoing messages and expects a response.
65///
66/// NOTE: This transport expects a single threaded server, as it will process
67/// responses in FIFO order.
68class TClientSocketTransport extends TSocketTransport {
69 final List<Completer<Uint8List>> _completers = [];
70
71 TClientSocketTransport(TSocket socket) : super(socket);
72
73 Future flush() {
74 Uint8List bytes = consumeWriteBuffer();
75
76 // Use a sync completer to ensure that the buffer can be read immediately
77 // after the read buffer is set, and avoid a race condition where another
78 // response could overwrite the read buffer.
79 var completer = new Completer<Uint8List>.sync();
80 _completers.add(completer);
81
82 if (bytes.lengthInBytes > 0) {
83 socket.send(bytes);
84 }
85
86 return completer.future;
87 }
88
89 void handleIncomingMessage(Uint8List messageBytes) {
90 super.handleIncomingMessage(messageBytes);
91
92 if (_completers.isNotEmpty) {
93 var completer = _completers.removeAt(0);
94 completer.complete();
95 }
96 }
97}
98
99/// [TAsyncClientSocketTransport] sends outgoing messages and expects an
100/// asynchronous response.
101///
102/// NOTE: This transport uses a [MessageReader] to read a [TMessage] when an
103/// incoming message arrives to correlate a response to a request, using the
104/// seqid.
105class TAsyncClientSocketTransport extends TSocketTransport {
106 static const defaultTimeout = const Duration(seconds: 30);
107
108 final Map<int, Completer<Uint8List>> _completers = {};
109
110 final TMessageReader messageReader;
111
112 final Duration responseTimeout;
113
114 TAsyncClientSocketTransport(TSocket socket, TMessageReader messageReader,
115 {Duration responseTimeout: defaultTimeout})
116 : this.messageReader = messageReader,
117 this.responseTimeout = responseTimeout,
118 super(socket);
119
120 Future flush() {
121 Uint8List bytes = consumeWriteBuffer();
122 TMessage message = messageReader.readMessage(bytes);
123 int seqid = message.seqid;
124
125 // Use a sync completer to ensure that the buffer can be read immediately
126 // after the read buffer is set, and avoid a race condition where another
127 // response could overwrite the read buffer.
128 var completer = new Completer<Uint8List>.sync();
129 _completers[seqid] = completer;
130
131 if (responseTimeout != null) {
132 new Future.delayed(responseTimeout, () {
133 var completer = _completers.remove(seqid);
134 if (completer != null) {
135 completer.completeError(
136 new TimeoutException("Response timed out.", responseTimeout));
137 }
138 });
139 }
140
141 socket.send(bytes);
142
143 return completer.future;
144 }
145
146 void handleIncomingMessage(Uint8List messageBytes) {
147 super.handleIncomingMessage(messageBytes);
148
149 TMessage message = messageReader.readMessage(messageBytes);
150 var completer = _completers.remove(message.seqid);
151 if (completer != null) {
152 completer.complete();
153 }
154 }
155}
156
157/// [TServerSocketTransport] listens for incoming messages. When it sends a
158/// response, it does not expect an acknowledgement.
159class TServerSocketTransport extends TSocketTransport {
160 final StreamController _onIncomingMessageController;
161 Stream get onIncomingMessage => _onIncomingMessageController.stream;
162
163 TServerSocketTransport(TSocket socket)
164 : _onIncomingMessageController = new StreamController.broadcast(),
165 super(socket);
166
167 Future flush() async {
168 Uint8List message = consumeWriteBuffer();
169 socket.send(message);
170 }
171
172 void handleIncomingMessage(Uint8List messageBytes) {
173 super.handleIncomingMessage(messageBytes);
174
175 _onIncomingMessageController.add(null);
176 }
177}