]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TTransport.cs
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / netstd / Thrift / Transport / TTransport.cs
CommitLineData
f67539c2
TL
1// Licensed to the Apache Software Foundation(ASF) under one
2// or more contributor license agreements.See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18using System;
19using System.Diagnostics;
20using System.IO;
21using System.Threading;
22using System.Threading.Tasks;
23
24namespace Thrift.Transport
25{
26 //TODO: think about client info
27 // ReSharper disable once InconsistentNaming
28 public abstract class TTransport : IDisposable
29 {
30 //TODO: think how to avoid peek byte
31 private readonly byte[] _peekBuffer = new byte[1];
32 private bool _hasPeekByte;
33 public abstract bool IsOpen { get; }
34
35 public void Dispose()
36 {
37 Dispose(true);
38 GC.SuppressFinalize(this);
39 }
40
41 public async ValueTask<bool> PeekAsync(CancellationToken cancellationToken)
42 {
43 //If we already have a byte read but not consumed, do nothing.
44 if (_hasPeekByte)
45 {
46 return true;
47 }
48
49 //If transport closed we can't peek.
50 if (!IsOpen)
51 {
52 return false;
53 }
54
55 //Try to read one byte. If succeeds we will need to store it for the next read.
56 try
57 {
58 var bytes = await ReadAsync(_peekBuffer, 0, 1, cancellationToken);
59 if (bytes == 0)
60 {
61 return false;
62 }
63 }
64 catch (IOException)
65 {
66 return false;
67 }
68
69 _hasPeekByte = true;
70 return true;
71 }
72
73 public virtual async Task OpenAsync()
74 {
75 await OpenAsync(CancellationToken.None);
76 }
77
78 public abstract Task OpenAsync(CancellationToken cancellationToken);
79
80 public abstract void Close();
81
82 protected static void ValidateBufferArgs(byte[] buffer, int offset, int length)
83 {
84 if (buffer == null)
85 {
86 throw new ArgumentNullException(nameof(buffer));
87 }
88
89#if DEBUG // let it fail with OutOfRange in RELEASE mode
90 if (offset < 0)
91 {
92 throw new ArgumentOutOfRangeException(nameof(offset), "Buffer offset must be >= 0");
93 }
94
95 if (length < 0)
96 {
97 throw new ArgumentOutOfRangeException(nameof(length), "Buffer length must be >= 0");
98 }
99
100 if (offset + length > buffer.Length)
101 {
102 throw new ArgumentOutOfRangeException(nameof(buffer), "Not enough data");
103 }
104#endif
105 }
106
107 public virtual async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length)
108 {
109 return await ReadAsync(buffer, offset, length, CancellationToken.None);
110 }
111
112 public abstract ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken);
113
114 public virtual async ValueTask<int> ReadAllAsync(byte[] buffer, int offset, int length)
115 {
116 return await ReadAllAsync(buffer, offset, length, CancellationToken.None);
117 }
118
119 public virtual async ValueTask<int> ReadAllAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
120 {
121 ValidateBufferArgs(buffer, offset, length);
122
123 if (cancellationToken.IsCancellationRequested)
124 return await Task.FromCanceled<int>(cancellationToken);
125
126 if (length <= 0)
127 return 0;
128
129 // If we previously peeked a byte, we need to use that first.
130 var totalBytes = 0;
131 if (_hasPeekByte)
132 {
133 buffer[offset++] = _peekBuffer[0];
134 _hasPeekByte = false;
135 if (1 == length)
136 {
137 return 1; // we're done
138 }
139 ++totalBytes;
140 }
141
142 var remaining = length - totalBytes;
143 Debug.Assert(remaining > 0); // any other possible cases should have been handled already
144 while (true)
145 {
146 var numBytes = await ReadAsync(buffer, offset, remaining, cancellationToken);
147 totalBytes += numBytes;
148 if (totalBytes >= length)
149 {
150 return totalBytes; // we're done
151 }
152
153 if (numBytes <= 0)
154 {
155 throw new TTransportException(TTransportException.ExceptionType.EndOfFile,
156 "Cannot read, Remote side has closed");
157 }
158
159 remaining -= numBytes;
160 offset += numBytes;
161 }
162 }
163
164 public virtual async Task WriteAsync(byte[] buffer)
165 {
166 await WriteAsync(buffer, CancellationToken.None);
167 }
168
169 public virtual async Task WriteAsync(byte[] buffer, CancellationToken cancellationToken)
170 {
171 await WriteAsync(buffer, 0, buffer.Length, CancellationToken.None);
172 }
173
174 public virtual async Task WriteAsync(byte[] buffer, int offset, int length)
175 {
176 await WriteAsync(buffer, offset, length, CancellationToken.None);
177 }
178
179 public abstract Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken);
180
181 public virtual async Task FlushAsync()
182 {
183 await FlushAsync(CancellationToken.None);
184 }
185
186 public abstract Task FlushAsync(CancellationToken cancellationToken);
187
188 protected abstract void Dispose(bool disposing);
189 }
190}