]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TBufferedClientTransport.cs
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / jaegertracing / thrift / lib / netcore / Thrift / Transports / Client / TBufferedClientTransport.cs
1 // Licensed to the Apache Software Foundation(ASF) under one
2 // or more contributor license agreements.See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 using System;
19 using System.IO;
20 using System.Threading;
21 using System.Threading.Tasks;
22
23 namespace Thrift.Transports.Client
24 {
25 // ReSharper disable once InconsistentNaming
26 public class TBufferedClientTransport : TClientTransport
27 {
28 private readonly int _bufSize;
29 private readonly MemoryStream _inputBuffer = new MemoryStream(0);
30 private readonly MemoryStream _outputBuffer = new MemoryStream(0);
31 private readonly TClientTransport _transport;
32 private bool _isDisposed;
33
34 //TODO: should support only specified input transport?
35 public TBufferedClientTransport(TClientTransport transport, int bufSize = 1024)
36 {
37 if (bufSize <= 0)
38 {
39 throw new ArgumentOutOfRangeException(nameof(bufSize), "Buffer size must be a positive number.");
40 }
41
42 _transport = transport ?? throw new ArgumentNullException(nameof(transport));
43 _bufSize = bufSize;
44 }
45
46 public TClientTransport UnderlyingTransport
47 {
48 get
49 {
50 CheckNotDisposed();
51
52 return _transport;
53 }
54 }
55
56 public override bool IsOpen => !_isDisposed && _transport.IsOpen;
57
58 public override async Task OpenAsync(CancellationToken cancellationToken)
59 {
60 CheckNotDisposed();
61
62 await _transport.OpenAsync(cancellationToken);
63 }
64
65 public override void Close()
66 {
67 CheckNotDisposed();
68
69 _transport.Close();
70 }
71
72 public override async Task<int> ReadAsync(byte[] buffer, int offset, int length,
73 CancellationToken cancellationToken)
74 {
75 //TODO: investigate how it should work correctly
76 CheckNotDisposed();
77
78 ValidateBufferArgs(buffer, offset, length);
79
80 if (!IsOpen)
81 {
82 throw new TTransportException(TTransportException.ExceptionType.NotOpen);
83 }
84
85 if (_inputBuffer.Capacity < _bufSize)
86 {
87 _inputBuffer.Capacity = _bufSize;
88 }
89
90 var got = await _inputBuffer.ReadAsync(buffer, offset, length, cancellationToken);
91 if (got > 0)
92 {
93 return got;
94 }
95
96 _inputBuffer.Seek(0, SeekOrigin.Begin);
97 _inputBuffer.SetLength(_inputBuffer.Capacity);
98
99 ArraySegment<byte> bufSegment;
100 _inputBuffer.TryGetBuffer(out bufSegment);
101
102 // investigate
103 var filled = await _transport.ReadAsync(bufSegment.Array, 0, (int) _inputBuffer.Length, cancellationToken);
104 _inputBuffer.SetLength(filled);
105
106 if (filled == 0)
107 {
108 return 0;
109 }
110
111 return await ReadAsync(buffer, offset, length, cancellationToken);
112 }
113
114 public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
115 {
116 CheckNotDisposed();
117
118 ValidateBufferArgs(buffer, offset, length);
119
120 if (!IsOpen)
121 {
122 throw new TTransportException(TTransportException.ExceptionType.NotOpen);
123 }
124
125 // Relative offset from "off" argument
126 var writtenCount = 0;
127 if (_outputBuffer.Length > 0)
128 {
129 var capa = (int) (_outputBuffer.Capacity - _outputBuffer.Length);
130 var writeSize = capa <= length ? capa : length;
131 await _outputBuffer.WriteAsync(buffer, offset, writeSize, cancellationToken);
132
133 writtenCount += writeSize;
134 if (writeSize == capa)
135 {
136 //ArraySegment<byte> bufSegment;
137 //_outputBuffer.TryGetBuffer(out bufSegment);
138 var data = _outputBuffer.ToArray();
139 //await _transport.WriteAsync(bufSegment.Array, cancellationToken);
140 await _transport.WriteAsync(data, cancellationToken);
141 _outputBuffer.SetLength(0);
142 }
143 }
144
145 while (length - writtenCount >= _bufSize)
146 {
147 await _transport.WriteAsync(buffer, offset + writtenCount, _bufSize, cancellationToken);
148 writtenCount += _bufSize;
149 }
150
151 var remain = length - writtenCount;
152 if (remain > 0)
153 {
154 if (_outputBuffer.Capacity < _bufSize)
155 {
156 _outputBuffer.Capacity = _bufSize;
157 }
158 await _outputBuffer.WriteAsync(buffer, offset + writtenCount, remain, cancellationToken);
159 }
160 }
161
162 public override async Task FlushAsync(CancellationToken cancellationToken)
163 {
164 CheckNotDisposed();
165
166 if (!IsOpen)
167 {
168 throw new TTransportException(TTransportException.ExceptionType.NotOpen);
169 }
170
171 if (_outputBuffer.Length > 0)
172 {
173 //ArraySegment<byte> bufSegment;
174 var data = _outputBuffer.ToArray(); // TryGetBuffer(out bufSegment);
175
176 await _transport.WriteAsync(data /*bufSegment.Array*/, cancellationToken);
177 _outputBuffer.SetLength(0);
178 }
179
180 await _transport.FlushAsync(cancellationToken);
181 }
182
183 private void CheckNotDisposed()
184 {
185 if (_isDisposed)
186 {
187 throw new ObjectDisposedException(nameof(_transport));
188 }
189 }
190
191 // IDisposable
192 protected override void Dispose(bool disposing)
193 {
194 if (!_isDisposed)
195 {
196 if (disposing)
197 {
198 _inputBuffer?.Dispose();
199 _outputBuffer?.Dispose();
200 _transport?.Dispose();
201 }
202 }
203 _isDisposed = true;
204 }
205 }
206 }