]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Client/THttpTransport.cs
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / netstd / Thrift / Transport / Client / THttpTransport.cs
CommitLineData
f67539c2
TL
1// Licensed to the Apache Software Foundation(ASF) under one
2// or more contributor license agreements.See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18using System;
19using System.Collections.Generic;
20using System.IO;
21using System.Linq;
22using System.Net.Http;
23using System.Net.Http.Headers;
24using System.Security.Cryptography.X509Certificates;
25using System.Threading;
26using System.Threading.Tasks;
27
28namespace Thrift.Transport.Client
29{
30 // ReSharper disable once InconsistentNaming
31 public class THttpTransport : TTransport
32 {
33 private readonly X509Certificate[] _certificates;
34 private readonly Uri _uri;
35
36 private int _connectTimeout = 30000; // Timeouts in milliseconds
37 private HttpClient _httpClient;
38 private Stream _inputStream;
39 private MemoryStream _outputStream = new MemoryStream();
40 private bool _isDisposed;
41
42 public THttpTransport(Uri uri, IDictionary<string, string> customRequestHeaders = null, string userAgent = null)
43 : this(uri, Enumerable.Empty<X509Certificate>(), customRequestHeaders, userAgent)
44 {
45 }
46
47 public THttpTransport(Uri uri, IEnumerable<X509Certificate> certificates,
48 IDictionary<string, string> customRequestHeaders, string userAgent = null)
49 {
50 _uri = uri;
51 _certificates = (certificates ?? Enumerable.Empty<X509Certificate>()).ToArray();
52
53 if (!string.IsNullOrEmpty(userAgent))
54 UserAgent = userAgent;
55
56 // due to current bug with performance of Dispose in netcore https://github.com/dotnet/corefx/issues/8809
57 // this can be switched to default way (create client->use->dispose per flush) later
58 _httpClient = CreateClient(customRequestHeaders);
59 }
60
61 // According to RFC 2616 section 3.8, the "User-Agent" header may not carry a version number
62 public readonly string UserAgent = "Thrift netstd THttpClient";
63
64 public override bool IsOpen => true;
65
66 public HttpRequestHeaders RequestHeaders => _httpClient.DefaultRequestHeaders;
67
68 public MediaTypeHeaderValue ContentType { get; set; }
69
70 public override async Task OpenAsync(CancellationToken cancellationToken)
71 {
72 if (cancellationToken.IsCancellationRequested)
73 {
74 await Task.FromCanceled(cancellationToken);
75 }
76 }
77
78 public override void Close()
79 {
80 if (_inputStream != null)
81 {
82 _inputStream.Dispose();
83 _inputStream = null;
84 }
85
86 if (_outputStream != null)
87 {
88 _outputStream.Dispose();
89 _outputStream = null;
90 }
91
92 if (_httpClient != null)
93 {
94 _httpClient.Dispose();
95 _httpClient = null;
96 }
97 }
98
99 public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
100 {
101 if (cancellationToken.IsCancellationRequested)
102 {
103 return await Task.FromCanceled<int>(cancellationToken);
104 }
105
106 if (_inputStream == null)
107 {
108 throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No request has been sent");
109 }
110
111 try
112 {
113 var ret = await _inputStream.ReadAsync(buffer, offset, length, cancellationToken);
114
115 if (ret == -1)
116 {
117 throw new TTransportException(TTransportException.ExceptionType.EndOfFile, "No more data available");
118 }
119
120 return ret;
121 }
122 catch (IOException iox)
123 {
124 throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString());
125 }
126 }
127
128 public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
129 {
130 if (cancellationToken.IsCancellationRequested)
131 {
132 await Task.FromCanceled(cancellationToken);
133 }
134
135 await _outputStream.WriteAsync(buffer, offset, length, cancellationToken);
136 }
137
138 private HttpClient CreateClient(IDictionary<string, string> customRequestHeaders)
139 {
140 var handler = new HttpClientHandler();
141 handler.ClientCertificates.AddRange(_certificates);
142 handler.AutomaticDecompression = System.Net.DecompressionMethods.Deflate | System.Net.DecompressionMethods.GZip;
143
144 var httpClient = new HttpClient(handler);
145
146 if (_connectTimeout > 0)
147 {
148 httpClient.Timeout = TimeSpan.FromMilliseconds(_connectTimeout);
149 }
150
151 httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/x-thrift"));
152 httpClient.DefaultRequestHeaders.UserAgent.TryParseAdd(UserAgent);
153
154 httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("deflate"));
155 httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("gzip"));
156
157 if (customRequestHeaders != null)
158 {
159 foreach (var item in customRequestHeaders)
160 {
161 httpClient.DefaultRequestHeaders.Add(item.Key, item.Value);
162 }
163 }
164
165 return httpClient;
166 }
167
168 public override async Task FlushAsync(CancellationToken cancellationToken)
169 {
170 try
171 {
172 _outputStream.Seek(0, SeekOrigin.Begin);
173
174 using (var contentStream = new StreamContent(_outputStream))
175 {
176 contentStream.Headers.ContentType = ContentType ?? new MediaTypeHeaderValue(@"application/x-thrift");
177
178 var response = (await _httpClient.PostAsync(_uri, contentStream, cancellationToken)).EnsureSuccessStatusCode();
179
180 _inputStream?.Dispose();
181 _inputStream = await response.Content.ReadAsStreamAsync();
182 if (_inputStream.CanSeek)
183 {
184 _inputStream.Seek(0, SeekOrigin.Begin);
185 }
186 }
187 }
188 catch (IOException iox)
189 {
190 throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString());
191 }
192 catch (HttpRequestException wx)
193 {
194 throw new TTransportException(TTransportException.ExceptionType.Unknown,
195 "Couldn't connect to server: " + wx);
196 }
197 catch (Exception ex)
198 {
199 throw new TTransportException(TTransportException.ExceptionType.Unknown, ex.Message);
200 }
201 finally
202 {
203 _outputStream = new MemoryStream();
204 }
205 }
206
207 // IDisposable
208 protected override void Dispose(bool disposing)
209 {
210 if (!_isDisposed)
211 {
212 if (disposing)
213 {
214 _inputStream?.Dispose();
215 _outputStream?.Dispose();
216 _httpClient?.Dispose();
217 }
218 }
219 _isDisposed = true;
220 }
221 }
222}