]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/dart/lib/src/console/t_web_socket.dart
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / dart / lib / src / console / t_web_socket.dart
1 /// Licensed to the Apache Software Foundation (ASF) under one
2 /// or more contributor license agreements. See the NOTICE file
3 /// distributed with this work for additional information
4 /// regarding copyright ownership. The ASF licenses this file
5 /// to you under the Apache License, Version 2.0 (the
6 /// "License"); you may not use this file except in compliance
7 /// with the License. You may obtain a copy of the License at
8 ///
9 /// http://www.apache.org/licenses/LICENSE-2.0
10 ///
11 /// Unless required by applicable law or agreed to in writing,
12 /// software distributed under the License is distributed on an
13 /// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 /// KIND, either express or implied. See the License for the
15 /// specific language governing permissions and limitations
16 /// under the License.
17
18 library thrift.src.console.t_web_socket;
19
20 import 'dart:async';
21 import 'package:dart2_constant/convert.dart' show base64;
22 import 'dart:io';
23 import 'dart:typed_data' show Uint8List;
24
25 import 'package:thrift/thrift.dart';
26
27 /// A [TSocket] backed by a [WebSocket] from dart:io
28 class TWebSocket implements TSocket {
29 final StreamController<TSocketState> _onStateController;
30 Stream<TSocketState> get onState => _onStateController.stream;
31
32 final StreamController<Object> _onErrorController;
33 Stream<Object> get onError => _onErrorController.stream;
34
35 final StreamController<Uint8List> _onMessageController;
36 Stream<Uint8List> get onMessage => _onMessageController.stream;
37
38 TWebSocket(WebSocket socket)
39 : _onStateController = new StreamController.broadcast(),
40 _onErrorController = new StreamController.broadcast(),
41 _onMessageController = new StreamController.broadcast() {
42 if (socket == null) {
43 throw new ArgumentError.notNull('socket');
44 }
45
46 _socket = socket;
47 _socket.listen(_onMessage, onError: _onError, onDone: close);
48 }
49
50 WebSocket _socket;
51
52 bool get isOpen => _socket != null;
53
54 bool get isClosed => _socket == null;
55
56 Future open() async {
57 _onStateController.add(TSocketState.OPEN);
58 }
59
60 Future close() async {
61 if (_socket != null) {
62 await _socket.close();
63 _socket = null;
64 }
65
66 _onStateController.add(TSocketState.CLOSED);
67 }
68
69 void send(Uint8List data) {
70 _socket.add(base64.encode(data));
71 }
72
73 void _onMessage(String message) {
74 try {
75 Uint8List data = new Uint8List.fromList(base64.decode(message));
76 _onMessageController.add(data);
77 } on FormatException catch (_) {
78 var error = new TProtocolError(TProtocolErrorType.INVALID_DATA,
79 "Expected a Base 64 encoded string.");
80 _onErrorController.add(error);
81 }
82 }
83
84 void _onError(Object error) {
85 close();
86 _onErrorController.add('$error');
87 }
88 }