1 // Licensed to the Apache Software Foundation(ASF) under one
2 // or more contributor license agreements.See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
20 using System.Threading;
21 using System.Threading.Tasks;
23 namespace Thrift.Transports.Client
25 // ReSharper disable once InconsistentNaming
26 public class TBufferedClientTransport : TClientTransport
28 private readonly int _bufSize;
29 private readonly MemoryStream _inputBuffer = new MemoryStream(0);
30 private readonly MemoryStream _outputBuffer = new MemoryStream(0);
31 private readonly TClientTransport _transport;
32 private bool _isDisposed;
34 //TODO: should support only specified input transport?
35 public TBufferedClientTransport(TClientTransport transport, int bufSize = 1024)
39 throw new ArgumentOutOfRangeException(nameof(bufSize), "Buffer size must be a positive number.");
42 _transport = transport ?? throw new ArgumentNullException(nameof(transport));
46 public TClientTransport UnderlyingTransport
56 public override bool IsOpen => !_isDisposed && _transport.IsOpen;
58 public override async Task OpenAsync(CancellationToken cancellationToken)
62 await _transport.OpenAsync(cancellationToken);
65 public override void Close()
72 public override async Task<int> ReadAsync(byte[] buffer, int offset, int length,
73 CancellationToken cancellationToken)
75 //TODO: investigate how it should work correctly
78 ValidateBufferArgs(buffer, offset, length);
82 throw new TTransportException(TTransportException.ExceptionType.NotOpen);
85 if (_inputBuffer.Capacity < _bufSize)
87 _inputBuffer.Capacity = _bufSize;
90 var got = await _inputBuffer.ReadAsync(buffer, offset, length, cancellationToken);
96 _inputBuffer.Seek(0, SeekOrigin.Begin);
97 _inputBuffer.SetLength(_inputBuffer.Capacity);
99 ArraySegment<byte> bufSegment;
100 _inputBuffer.TryGetBuffer(out bufSegment);
103 var filled = await _transport.ReadAsync(bufSegment.Array, 0, (int) _inputBuffer.Length, cancellationToken);
104 _inputBuffer.SetLength(filled);
111 return await ReadAsync(buffer, offset, length, cancellationToken);
114 public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
118 ValidateBufferArgs(buffer, offset, length);
122 throw new TTransportException(TTransportException.ExceptionType.NotOpen);
125 // Relative offset from "off" argument
126 var writtenCount = 0;
127 if (_outputBuffer.Length > 0)
129 var capa = (int) (_outputBuffer.Capacity - _outputBuffer.Length);
130 var writeSize = capa <= length ? capa : length;
131 await _outputBuffer.WriteAsync(buffer, offset, writeSize, cancellationToken);
133 writtenCount += writeSize;
134 if (writeSize == capa)
136 //ArraySegment<byte> bufSegment;
137 //_outputBuffer.TryGetBuffer(out bufSegment);
138 var data = _outputBuffer.ToArray();
139 //await _transport.WriteAsync(bufSegment.Array, cancellationToken);
140 await _transport.WriteAsync(data, cancellationToken);
141 _outputBuffer.SetLength(0);
145 while (length - writtenCount >= _bufSize)
147 await _transport.WriteAsync(buffer, offset + writtenCount, _bufSize, cancellationToken);
148 writtenCount += _bufSize;
151 var remain = length - writtenCount;
154 if (_outputBuffer.Capacity < _bufSize)
156 _outputBuffer.Capacity = _bufSize;
158 await _outputBuffer.WriteAsync(buffer, offset + writtenCount, remain, cancellationToken);
162 public override async Task FlushAsync(CancellationToken cancellationToken)
168 throw new TTransportException(TTransportException.ExceptionType.NotOpen);
171 if (_outputBuffer.Length > 0)
173 //ArraySegment<byte> bufSegment;
174 var data = _outputBuffer.ToArray(); // TryGetBuffer(out bufSegment);
176 await _transport.WriteAsync(data /*bufSegment.Array*/, cancellationToken);
177 _outputBuffer.SetLength(0);
180 await _transport.FlushAsync(cancellationToken);
183 private void CheckNotDisposed()
187 throw new ObjectDisposedException(nameof(_transport));
192 protected override void Dispose(bool disposing)
198 _inputBuffer?.Dispose();
199 _outputBuffer?.Dispose();
200 _transport?.Dispose();