]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TFramedClientTransport.cs
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / netcore / Thrift / Transports / Client / TFramedClientTransport.cs
CommitLineData
f67539c2
TL
1// Licensed to the Apache Software Foundation(ASF) under one
2// or more contributor license agreements.See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18using System;
19using System.IO;
20using System.Threading;
21using System.Threading.Tasks;
22
23namespace Thrift.Transports.Client
24{
25 //TODO: check for correct implementation
26
27 // ReSharper disable once InconsistentNaming
28 public class TFramedClientTransport : TClientTransport
29 {
30 private const int HeaderSize = 4;
31 private readonly byte[] _headerBuf = new byte[HeaderSize];
32 private readonly MemoryStream _readBuffer = new MemoryStream(1024);
33 private readonly TClientTransport _transport;
34 private readonly MemoryStream _writeBuffer = new MemoryStream(1024);
35
36 private bool _isDisposed;
37
38 public TFramedClientTransport(TClientTransport transport)
39 {
40 _transport = transport ?? throw new ArgumentNullException(nameof(transport));
41
42 InitWriteBuffer();
43 }
44
45 public override bool IsOpen => !_isDisposed && _transport.IsOpen;
46
47 public override async Task OpenAsync(CancellationToken cancellationToken)
48 {
49 CheckNotDisposed();
50
51 await _transport.OpenAsync(cancellationToken);
52 }
53
54 public override void Close()
55 {
56 CheckNotDisposed();
57
58 _transport.Close();
59 }
60
61 public override async Task<int> ReadAsync(byte[] buffer, int offset, int length,
62 CancellationToken cancellationToken)
63 {
64 CheckNotDisposed();
65
66 ValidateBufferArgs(buffer, offset, length);
67
68 if (!IsOpen)
69 {
70 throw new TTransportException(TTransportException.ExceptionType.NotOpen);
71 }
72
73 var got = await _readBuffer.ReadAsync(buffer, offset, length, cancellationToken);
74 if (got > 0)
75 {
76 return got;
77 }
78
79 // Read another frame of data
80 await ReadFrameAsync(cancellationToken);
81
82 return await _readBuffer.ReadAsync(buffer, offset, length, cancellationToken);
83 }
84
85 private async Task ReadFrameAsync(CancellationToken cancellationToken)
86 {
87 await _transport.ReadAllAsync(_headerBuf, 0, HeaderSize, cancellationToken);
88
89 var size = DecodeFrameSize(_headerBuf);
90
91 _readBuffer.SetLength(size);
92 _readBuffer.Seek(0, SeekOrigin.Begin);
93
94 ArraySegment<byte> bufSegment;
95 _readBuffer.TryGetBuffer(out bufSegment);
96
97 var buff = bufSegment.Array;
98
99 await _transport.ReadAllAsync(buff, 0, size, cancellationToken);
100 }
101
102 public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
103 {
104 CheckNotDisposed();
105
106 ValidateBufferArgs(buffer, offset, length);
107
108 if (!IsOpen)
109 {
110 throw new TTransportException(TTransportException.ExceptionType.NotOpen);
111 }
112
113 if (_writeBuffer.Length + length > int.MaxValue)
114 {
115 await FlushAsync(cancellationToken);
116 }
117
118 await _writeBuffer.WriteAsync(buffer, offset, length, cancellationToken);
119 }
120
121 public override async Task FlushAsync(CancellationToken cancellationToken)
122 {
123 CheckNotDisposed();
124
125 if (!IsOpen)
126 {
127 throw new TTransportException(TTransportException.ExceptionType.NotOpen);
128 }
129
130 //ArraySegment<byte> bufSegment;
131 //_writeBuffer.TryGetBuffer(out bufSegment);
132 //var buf = bufSegment.Array;
133 var buf = _writeBuffer.ToArray();
134
135 //var len = (int)_writeBuffer.Length;
136 var dataLen = (int) _writeBuffer.Length - HeaderSize;
137 if (dataLen < 0)
138 {
139 throw new InvalidOperationException(); // logic error actually
140 }
141
142 // Inject message header into the reserved buffer space
143 EncodeFrameSize(dataLen, buf);
144
145 // Send the entire message at once
146 await _transport.WriteAsync(buf, cancellationToken);
147
148 InitWriteBuffer();
149
150 await _transport.FlushAsync(cancellationToken);
151 }
152
153 private void InitWriteBuffer()
154 {
155 // Reserve space for message header to be put right before sending it out
156 _writeBuffer.SetLength(HeaderSize);
157 _writeBuffer.Seek(0, SeekOrigin.End);
158 }
159
160 private static void EncodeFrameSize(int frameSize, byte[] buf)
161 {
162 buf[0] = (byte) (0xff & (frameSize >> 24));
163 buf[1] = (byte) (0xff & (frameSize >> 16));
164 buf[2] = (byte) (0xff & (frameSize >> 8));
165 buf[3] = (byte) (0xff & (frameSize));
166 }
167
168 private static int DecodeFrameSize(byte[] buf)
169 {
170 return
171 ((buf[0] & 0xff) << 24) |
172 ((buf[1] & 0xff) << 16) |
173 ((buf[2] & 0xff) << 8) |
174 (buf[3] & 0xff);
175 }
176
177
178 private void CheckNotDisposed()
179 {
180 if (_isDisposed)
181 {
182 throw new ObjectDisposedException("TFramedClientTransport");
183 }
184 }
185
186 // IDisposable
187 protected override void Dispose(bool disposing)
188 {
189 if (!_isDisposed)
190 {
191 if (disposing)
192 {
193 _readBuffer?.Dispose();
194 _writeBuffer?.Dispose();
195 _transport?.Dispose();
196 }
197 }
198 _isDisposed = true;
199 }
200 }
201}