]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/TClientTransport.cs
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / netcore / Thrift / Transports / TClientTransport.cs
CommitLineData
f67539c2
TL
1// Licensed to the Apache Software Foundation(ASF) under one
2// or more contributor license agreements.See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18using System;
19using System.IO;
20using System.Threading;
21using System.Threading.Tasks;
22
23namespace Thrift.Transports
24{
25 //TODO: think about client info
26 // ReSharper disable once InconsistentNaming
27 public abstract class TClientTransport : IDisposable
28 {
29 //TODO: think how to avoid peek byte
30 private readonly byte[] _peekBuffer = new byte[1];
31 private bool _hasPeekByte;
32 public abstract bool IsOpen { get; }
33
34 public void Dispose()
35 {
36 Dispose(true);
37 GC.SuppressFinalize(this);
38 }
39
40 public async Task<bool> PeekAsync(CancellationToken cancellationToken)
41 {
42 //If we already have a byte read but not consumed, do nothing.
43 if (_hasPeekByte)
44 {
45 return true;
46 }
47
48 //If transport closed we can't peek.
49 if (!IsOpen)
50 {
51 return false;
52 }
53
54 //Try to read one byte. If succeeds we will need to store it for the next read.
55 try
56 {
57 var bytes = await ReadAsync(_peekBuffer, 0, 1, cancellationToken);
58 if (bytes == 0)
59 {
60 return false;
61 }
62 }
63 catch (IOException)
64 {
65 return false;
66 }
67
68 _hasPeekByte = true;
69 return true;
70 }
71
72 public virtual async Task OpenAsync()
73 {
74 await OpenAsync(CancellationToken.None);
75 }
76
77 public abstract Task OpenAsync(CancellationToken cancellationToken);
78
79 public abstract void Close();
80
81 protected static void ValidateBufferArgs(byte[] buffer, int offset, int length)
82 {
83 if (buffer == null)
84 {
85 throw new ArgumentNullException(nameof(buffer));
86 }
87
88 if (offset < 0)
89 {
90 throw new ArgumentOutOfRangeException(nameof(offset), "Buffer offset is smaller than zero.");
91 }
92
93 if (length < 0)
94 {
95 throw new ArgumentOutOfRangeException(nameof(length), "Buffer length is smaller than zero.");
96 }
97
98 if (offset + length > buffer.Length)
99 {
100 throw new ArgumentOutOfRangeException(nameof(buffer), "Not enough data.");
101 }
102 }
103
104 public virtual async Task<int> ReadAsync(byte[] buffer, int offset, int length)
105 {
106 return await ReadAsync(buffer, offset, length, CancellationToken.None);
107 }
108
109 public abstract Task<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken);
110
111 public virtual async Task<int> ReadAllAsync(byte[] buffer, int offset, int length)
112 {
113 return await ReadAllAsync(buffer, offset, length, CancellationToken.None);
114 }
115
116 public virtual async Task<int> ReadAllAsync(byte[] buffer, int offset, int length,
117 CancellationToken cancellationToken)
118 {
119 ValidateBufferArgs(buffer, offset, length);
120
121 if (cancellationToken.IsCancellationRequested)
122 {
123 return await Task.FromCanceled<int>(cancellationToken);
124 }
125
126 var retrieved = 0;
127
128 //If we previously peeked a byte, we need to use that first.
129 if (_hasPeekByte)
130 {
131 buffer[offset + retrieved++] = _peekBuffer[0];
132 _hasPeekByte = false;
133 }
134
135 while (retrieved < length)
136 {
137 if (cancellationToken.IsCancellationRequested)
138 {
139 return await Task.FromCanceled<int>(cancellationToken);
140 }
141
142 var returnedCount = await ReadAsync(buffer, offset + retrieved, length - retrieved, cancellationToken);
143 if (returnedCount <= 0)
144 {
145 throw new TTransportException(TTransportException.ExceptionType.EndOfFile,
146 "Cannot read, Remote side has closed");
147 }
148 retrieved += returnedCount;
149 }
150 return retrieved;
151 }
152
153 public virtual async Task WriteAsync(byte[] buffer)
154 {
155 await WriteAsync(buffer, CancellationToken.None);
156 }
157
158 public virtual async Task WriteAsync(byte[] buffer, CancellationToken cancellationToken)
159 {
160 await WriteAsync(buffer, 0, buffer.Length, CancellationToken.None);
161 }
162
163 public virtual async Task WriteAsync(byte[] buffer, int offset, int length)
164 {
165 await WriteAsync(buffer, offset, length, CancellationToken.None);
166 }
167
168 public abstract Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken);
169
170 public virtual async Task FlushAsync()
171 {
172 await FlushAsync(CancellationToken.None);
173 }
174
175 public abstract Task FlushAsync(CancellationToken cancellationToken);
176
177 protected abstract void Dispose(bool disposing);
178 }
179}