]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // Licensed to the Apache Software Foundation(ASF) under one |
2 | // or more contributor license agreements.See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership.The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | using System; | |
19 | using System.IO; | |
20 | using System.Threading; | |
21 | using System.Threading.Tasks; | |
22 | ||
23 | namespace Thrift.Transports.Client | |
24 | { | |
25 | // ReSharper disable once InconsistentNaming | |
26 | public class TBufferedClientTransport : TClientTransport | |
27 | { | |
28 | private readonly int _bufSize; | |
29 | private readonly MemoryStream _inputBuffer = new MemoryStream(0); | |
30 | private readonly MemoryStream _outputBuffer = new MemoryStream(0); | |
31 | private readonly TClientTransport _transport; | |
32 | private bool _isDisposed; | |
33 | ||
34 | //TODO: should support only specified input transport? | |
35 | public TBufferedClientTransport(TClientTransport transport, int bufSize = 1024) | |
36 | { | |
37 | if (bufSize <= 0) | |
38 | { | |
39 | throw new ArgumentOutOfRangeException(nameof(bufSize), "Buffer size must be a positive number."); | |
40 | } | |
41 | ||
42 | _transport = transport ?? throw new ArgumentNullException(nameof(transport)); | |
43 | _bufSize = bufSize; | |
44 | } | |
45 | ||
46 | public TClientTransport UnderlyingTransport | |
47 | { | |
48 | get | |
49 | { | |
50 | CheckNotDisposed(); | |
51 | ||
52 | return _transport; | |
53 | } | |
54 | } | |
55 | ||
56 | public override bool IsOpen => !_isDisposed && _transport.IsOpen; | |
57 | ||
58 | public override async Task OpenAsync(CancellationToken cancellationToken) | |
59 | { | |
60 | CheckNotDisposed(); | |
61 | ||
62 | await _transport.OpenAsync(cancellationToken); | |
63 | } | |
64 | ||
65 | public override void Close() | |
66 | { | |
67 | CheckNotDisposed(); | |
68 | ||
69 | _transport.Close(); | |
70 | } | |
71 | ||
72 | public override async Task<int> ReadAsync(byte[] buffer, int offset, int length, | |
73 | CancellationToken cancellationToken) | |
74 | { | |
75 | //TODO: investigate how it should work correctly | |
76 | CheckNotDisposed(); | |
77 | ||
78 | ValidateBufferArgs(buffer, offset, length); | |
79 | ||
80 | if (!IsOpen) | |
81 | { | |
82 | throw new TTransportException(TTransportException.ExceptionType.NotOpen); | |
83 | } | |
84 | ||
85 | if (_inputBuffer.Capacity < _bufSize) | |
86 | { | |
87 | _inputBuffer.Capacity = _bufSize; | |
88 | } | |
89 | ||
90 | var got = await _inputBuffer.ReadAsync(buffer, offset, length, cancellationToken); | |
91 | if (got > 0) | |
92 | { | |
93 | return got; | |
94 | } | |
95 | ||
96 | _inputBuffer.Seek(0, SeekOrigin.Begin); | |
97 | _inputBuffer.SetLength(_inputBuffer.Capacity); | |
98 | ||
99 | ArraySegment<byte> bufSegment; | |
100 | _inputBuffer.TryGetBuffer(out bufSegment); | |
101 | ||
102 | // investigate | |
103 | var filled = await _transport.ReadAsync(bufSegment.Array, 0, (int) _inputBuffer.Length, cancellationToken); | |
104 | _inputBuffer.SetLength(filled); | |
105 | ||
106 | if (filled == 0) | |
107 | { | |
108 | return 0; | |
109 | } | |
110 | ||
111 | return await ReadAsync(buffer, offset, length, cancellationToken); | |
112 | } | |
113 | ||
114 | public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) | |
115 | { | |
116 | CheckNotDisposed(); | |
117 | ||
118 | ValidateBufferArgs(buffer, offset, length); | |
119 | ||
120 | if (!IsOpen) | |
121 | { | |
122 | throw new TTransportException(TTransportException.ExceptionType.NotOpen); | |
123 | } | |
124 | ||
125 | // Relative offset from "off" argument | |
126 | var writtenCount = 0; | |
127 | if (_outputBuffer.Length > 0) | |
128 | { | |
129 | var capa = (int) (_outputBuffer.Capacity - _outputBuffer.Length); | |
130 | var writeSize = capa <= length ? capa : length; | |
131 | await _outputBuffer.WriteAsync(buffer, offset, writeSize, cancellationToken); | |
132 | ||
133 | writtenCount += writeSize; | |
134 | if (writeSize == capa) | |
135 | { | |
136 | //ArraySegment<byte> bufSegment; | |
137 | //_outputBuffer.TryGetBuffer(out bufSegment); | |
138 | var data = _outputBuffer.ToArray(); | |
139 | //await _transport.WriteAsync(bufSegment.Array, cancellationToken); | |
140 | await _transport.WriteAsync(data, cancellationToken); | |
141 | _outputBuffer.SetLength(0); | |
142 | } | |
143 | } | |
144 | ||
145 | while (length - writtenCount >= _bufSize) | |
146 | { | |
147 | await _transport.WriteAsync(buffer, offset + writtenCount, _bufSize, cancellationToken); | |
148 | writtenCount += _bufSize; | |
149 | } | |
150 | ||
151 | var remain = length - writtenCount; | |
152 | if (remain > 0) | |
153 | { | |
154 | if (_outputBuffer.Capacity < _bufSize) | |
155 | { | |
156 | _outputBuffer.Capacity = _bufSize; | |
157 | } | |
158 | await _outputBuffer.WriteAsync(buffer, offset + writtenCount, remain, cancellationToken); | |
159 | } | |
160 | } | |
161 | ||
162 | public override async Task FlushAsync(CancellationToken cancellationToken) | |
163 | { | |
164 | CheckNotDisposed(); | |
165 | ||
166 | if (!IsOpen) | |
167 | { | |
168 | throw new TTransportException(TTransportException.ExceptionType.NotOpen); | |
169 | } | |
170 | ||
171 | if (_outputBuffer.Length > 0) | |
172 | { | |
173 | //ArraySegment<byte> bufSegment; | |
174 | var data = _outputBuffer.ToArray(); // TryGetBuffer(out bufSegment); | |
175 | ||
176 | await _transport.WriteAsync(data /*bufSegment.Array*/, cancellationToken); | |
177 | _outputBuffer.SetLength(0); | |
178 | } | |
179 | ||
180 | await _transport.FlushAsync(cancellationToken); | |
181 | } | |
182 | ||
183 | private void CheckNotDisposed() | |
184 | { | |
185 | if (_isDisposed) | |
186 | { | |
187 | throw new ObjectDisposedException(nameof(_transport)); | |
188 | } | |
189 | } | |
190 | ||
191 | // IDisposable | |
192 | protected override void Dispose(bool disposing) | |
193 | { | |
194 | if (!_isDisposed) | |
195 | { | |
196 | if (disposing) | |
197 | { | |
198 | _inputBuffer?.Dispose(); | |
199 | _outputBuffer?.Dispose(); | |
200 | _transport?.Dispose(); | |
201 | } | |
202 | } | |
203 | _isDisposed = true; | |
204 | } | |
205 | } | |
206 | } |