]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // Licensed to the Apache Software Foundation(ASF) under one |
2 | // or more contributor license agreements.See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership.The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | using System; | |
19 | using System.Collections.Generic; | |
20 | using System.IO; | |
21 | using System.Linq; | |
22 | using System.Net.Http; | |
23 | using System.Net.Http.Headers; | |
24 | using System.Security.Cryptography.X509Certificates; | |
25 | using System.Threading; | |
26 | using System.Threading.Tasks; | |
27 | ||
28 | namespace Thrift.Transport.Client | |
29 | { | |
30 | // ReSharper disable once InconsistentNaming | |
31 | public class THttpTransport : TTransport | |
32 | { | |
33 | private readonly X509Certificate[] _certificates; | |
34 | private readonly Uri _uri; | |
35 | ||
36 | private int _connectTimeout = 30000; // Timeouts in milliseconds | |
37 | private HttpClient _httpClient; | |
38 | private Stream _inputStream; | |
39 | private MemoryStream _outputStream = new MemoryStream(); | |
40 | private bool _isDisposed; | |
41 | ||
42 | public THttpTransport(Uri uri, IDictionary<string, string> customRequestHeaders = null, string userAgent = null) | |
43 | : this(uri, Enumerable.Empty<X509Certificate>(), customRequestHeaders, userAgent) | |
44 | { | |
45 | } | |
46 | ||
47 | public THttpTransport(Uri uri, IEnumerable<X509Certificate> certificates, | |
48 | IDictionary<string, string> customRequestHeaders, string userAgent = null) | |
49 | { | |
50 | _uri = uri; | |
51 | _certificates = (certificates ?? Enumerable.Empty<X509Certificate>()).ToArray(); | |
52 | ||
53 | if (!string.IsNullOrEmpty(userAgent)) | |
54 | UserAgent = userAgent; | |
55 | ||
56 | // due to current bug with performance of Dispose in netcore https://github.com/dotnet/corefx/issues/8809 | |
57 | // this can be switched to default way (create client->use->dispose per flush) later | |
58 | _httpClient = CreateClient(customRequestHeaders); | |
59 | } | |
60 | ||
61 | // According to RFC 2616 section 3.8, the "User-Agent" header may not carry a version number | |
62 | public readonly string UserAgent = "Thrift netstd THttpClient"; | |
63 | ||
64 | public override bool IsOpen => true; | |
65 | ||
66 | public HttpRequestHeaders RequestHeaders => _httpClient.DefaultRequestHeaders; | |
67 | ||
68 | public MediaTypeHeaderValue ContentType { get; set; } | |
69 | ||
70 | public override async Task OpenAsync(CancellationToken cancellationToken) | |
71 | { | |
72 | if (cancellationToken.IsCancellationRequested) | |
73 | { | |
74 | await Task.FromCanceled(cancellationToken); | |
75 | } | |
76 | } | |
77 | ||
78 | public override void Close() | |
79 | { | |
80 | if (_inputStream != null) | |
81 | { | |
82 | _inputStream.Dispose(); | |
83 | _inputStream = null; | |
84 | } | |
85 | ||
86 | if (_outputStream != null) | |
87 | { | |
88 | _outputStream.Dispose(); | |
89 | _outputStream = null; | |
90 | } | |
91 | ||
92 | if (_httpClient != null) | |
93 | { | |
94 | _httpClient.Dispose(); | |
95 | _httpClient = null; | |
96 | } | |
97 | } | |
98 | ||
99 | public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) | |
100 | { | |
101 | if (cancellationToken.IsCancellationRequested) | |
102 | { | |
103 | return await Task.FromCanceled<int>(cancellationToken); | |
104 | } | |
105 | ||
106 | if (_inputStream == null) | |
107 | { | |
108 | throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No request has been sent"); | |
109 | } | |
110 | ||
111 | try | |
112 | { | |
113 | var ret = await _inputStream.ReadAsync(buffer, offset, length, cancellationToken); | |
114 | ||
115 | if (ret == -1) | |
116 | { | |
117 | throw new TTransportException(TTransportException.ExceptionType.EndOfFile, "No more data available"); | |
118 | } | |
119 | ||
120 | return ret; | |
121 | } | |
122 | catch (IOException iox) | |
123 | { | |
124 | throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString()); | |
125 | } | |
126 | } | |
127 | ||
128 | public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) | |
129 | { | |
130 | if (cancellationToken.IsCancellationRequested) | |
131 | { | |
132 | await Task.FromCanceled(cancellationToken); | |
133 | } | |
134 | ||
135 | await _outputStream.WriteAsync(buffer, offset, length, cancellationToken); | |
136 | } | |
137 | ||
138 | private HttpClient CreateClient(IDictionary<string, string> customRequestHeaders) | |
139 | { | |
140 | var handler = new HttpClientHandler(); | |
141 | handler.ClientCertificates.AddRange(_certificates); | |
142 | handler.AutomaticDecompression = System.Net.DecompressionMethods.Deflate | System.Net.DecompressionMethods.GZip; | |
143 | ||
144 | var httpClient = new HttpClient(handler); | |
145 | ||
146 | if (_connectTimeout > 0) | |
147 | { | |
148 | httpClient.Timeout = TimeSpan.FromMilliseconds(_connectTimeout); | |
149 | } | |
150 | ||
151 | httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/x-thrift")); | |
152 | httpClient.DefaultRequestHeaders.UserAgent.TryParseAdd(UserAgent); | |
153 | ||
154 | httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("deflate")); | |
155 | httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("gzip")); | |
156 | ||
157 | if (customRequestHeaders != null) | |
158 | { | |
159 | foreach (var item in customRequestHeaders) | |
160 | { | |
161 | httpClient.DefaultRequestHeaders.Add(item.Key, item.Value); | |
162 | } | |
163 | } | |
164 | ||
165 | return httpClient; | |
166 | } | |
167 | ||
168 | public override async Task FlushAsync(CancellationToken cancellationToken) | |
169 | { | |
170 | try | |
171 | { | |
172 | _outputStream.Seek(0, SeekOrigin.Begin); | |
173 | ||
174 | using (var contentStream = new StreamContent(_outputStream)) | |
175 | { | |
176 | contentStream.Headers.ContentType = ContentType ?? new MediaTypeHeaderValue(@"application/x-thrift"); | |
177 | ||
178 | var response = (await _httpClient.PostAsync(_uri, contentStream, cancellationToken)).EnsureSuccessStatusCode(); | |
179 | ||
180 | _inputStream?.Dispose(); | |
181 | _inputStream = await response.Content.ReadAsStreamAsync(); | |
182 | if (_inputStream.CanSeek) | |
183 | { | |
184 | _inputStream.Seek(0, SeekOrigin.Begin); | |
185 | } | |
186 | } | |
187 | } | |
188 | catch (IOException iox) | |
189 | { | |
190 | throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString()); | |
191 | } | |
192 | catch (HttpRequestException wx) | |
193 | { | |
194 | throw new TTransportException(TTransportException.ExceptionType.Unknown, | |
195 | "Couldn't connect to server: " + wx); | |
196 | } | |
197 | catch (Exception ex) | |
198 | { | |
199 | throw new TTransportException(TTransportException.ExceptionType.Unknown, ex.Message); | |
200 | } | |
201 | finally | |
202 | { | |
203 | _outputStream = new MemoryStream(); | |
204 | } | |
205 | } | |
206 | ||
207 | // IDisposable | |
208 | protected override void Dispose(bool disposing) | |
209 | { | |
210 | if (!_isDisposed) | |
211 | { | |
212 | if (disposing) | |
213 | { | |
214 | _inputStream?.Dispose(); | |
215 | _outputStream?.Dispose(); | |
216 | _httpClient?.Dispose(); | |
217 | } | |
218 | } | |
219 | _isDisposed = true; | |
220 | } | |
221 | } | |
222 | } |