]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TFramedTransport.cs
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / netstd / Thrift / Transport / TFramedTransport.cs
CommitLineData
f67539c2
TL
1// Licensed to the Apache Software Foundation(ASF) under one
2// or more contributor license agreements.See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18using System;
19using System.IO;
20using System.Threading;
21using System.Threading.Tasks;
22
23namespace Thrift.Transport
24{
25 // ReSharper disable once InconsistentNaming
26 public class TFramedTransport : TTransport
27 {
28 private const int HeaderSize = 4;
29 private readonly byte[] HeaderBuf = new byte[HeaderSize];
30 private readonly Client.TMemoryBufferTransport ReadBuffer = new Client.TMemoryBufferTransport();
31 private readonly Client.TMemoryBufferTransport WriteBuffer = new Client.TMemoryBufferTransport();
32 private readonly TTransport InnerTransport;
33
34 private bool IsDisposed;
35
36 public class Factory : TTransportFactory
37 {
38 public override TTransport GetTransport(TTransport trans)
39 {
40 return new TFramedTransport(trans);
41 }
42 }
43
44 public TFramedTransport(TTransport transport)
45 {
46 InnerTransport = transport ?? throw new ArgumentNullException(nameof(transport));
47
48 InitWriteBuffer();
49 }
50
51 public override bool IsOpen => !IsDisposed && InnerTransport.IsOpen;
52
53 public override async Task OpenAsync(CancellationToken cancellationToken)
54 {
55 CheckNotDisposed();
56
57 await InnerTransport.OpenAsync(cancellationToken);
58 }
59
60 public override void Close()
61 {
62 CheckNotDisposed();
63
64 InnerTransport.Close();
65 }
66
67 public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
68 {
69 CheckNotDisposed();
70 ValidateBufferArgs(buffer, offset, length);
71
72 if (!IsOpen)
73 {
74 throw new TTransportException(TTransportException.ExceptionType.NotOpen);
75 }
76
77 // Read another frame of data if we run out of bytes
78 if (ReadBuffer.Position >= ReadBuffer.Length)
79 {
80 await ReadFrameAsync(cancellationToken);
81 }
82
83 return await ReadBuffer.ReadAsync(buffer, offset, length, cancellationToken);
84 }
85
86 private async ValueTask ReadFrameAsync(CancellationToken cancellationToken)
87 {
88 await InnerTransport.ReadAllAsync(HeaderBuf, 0, HeaderSize, cancellationToken);
89 var size = DecodeFrameSize(HeaderBuf);
90
91 ReadBuffer.SetLength(size);
92 ReadBuffer.Seek(0, SeekOrigin.Begin);
93
94 ArraySegment<byte> bufSegment;
95 ReadBuffer.TryGetBuffer(out bufSegment);
96 await InnerTransport.ReadAllAsync(bufSegment.Array, 0, size, cancellationToken);
97 }
98
99 public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
100 {
101 CheckNotDisposed();
102 ValidateBufferArgs(buffer, offset, length);
103
104 if (!IsOpen)
105 {
106 throw new TTransportException(TTransportException.ExceptionType.NotOpen);
107 }
108
109 if (WriteBuffer.Length > (int.MaxValue - length))
110 {
111 await FlushAsync(cancellationToken);
112 }
113
114 await WriteBuffer.WriteAsync(buffer, offset, length, cancellationToken);
115 }
116
117 public override async Task FlushAsync(CancellationToken cancellationToken)
118 {
119 CheckNotDisposed();
120
121 if (!IsOpen)
122 {
123 throw new TTransportException(TTransportException.ExceptionType.NotOpen);
124 }
125
126 ArraySegment<byte> bufSegment;
127 WriteBuffer.TryGetBuffer(out bufSegment);
128
129 int dataLen = bufSegment.Count - HeaderSize;
130 if (dataLen < 0)
131 {
132 throw new InvalidOperationException(); // logic error actually
133 }
134
135 // Inject message header into the reserved buffer space
136 EncodeFrameSize(dataLen, bufSegment.Array);
137
138 // Send the entire message at once
139 await InnerTransport.WriteAsync(bufSegment.Array, 0, bufSegment.Count, cancellationToken);
140
141 InitWriteBuffer();
142
143 await InnerTransport.FlushAsync(cancellationToken);
144 }
145
146 private void InitWriteBuffer()
147 {
148 // Reserve space for message header to be put right before sending it out
149 WriteBuffer.SetLength(HeaderSize);
150 WriteBuffer.Seek(0, SeekOrigin.End);
151 }
152
153 private static void EncodeFrameSize(int frameSize, byte[] buf)
154 {
155 buf[0] = (byte) (0xff & (frameSize >> 24));
156 buf[1] = (byte) (0xff & (frameSize >> 16));
157 buf[2] = (byte) (0xff & (frameSize >> 8));
158 buf[3] = (byte) (0xff & (frameSize));
159 }
160
161 private static int DecodeFrameSize(byte[] buf)
162 {
163 return
164 ((buf[0] & 0xff) << 24) |
165 ((buf[1] & 0xff) << 16) |
166 ((buf[2] & 0xff) << 8) |
167 (buf[3] & 0xff);
168 }
169
170
171 private void CheckNotDisposed()
172 {
173 if (IsDisposed)
174 {
175 throw new ObjectDisposedException(this.GetType().Name);
176 }
177 }
178
179 // IDisposable
180 protected override void Dispose(bool disposing)
181 {
182 if (!IsDisposed)
183 {
184 if (disposing)
185 {
186 ReadBuffer?.Dispose();
187 WriteBuffer?.Dispose();
188 InnerTransport?.Dispose();
189 }
190 }
191 IsDisposed = true;
192 }
193 }
194}