2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
23 using System.Collections.Generic;
26 using System.Threading;
28 using System.Security.Cryptography.X509Certificates;
29 using System.IO.Compression;
31 namespace Thrift.Transport
33 public class THttpClient : TTransport, IDisposable
35 private readonly Uri uri;
36 private readonly X509Certificate[] certificates;
37 private Stream inputStream;
38 private MemoryStream outputStream = new MemoryStream();
40 // Timeouts in milliseconds
41 private int connectTimeout = 30000;
43 private int readTimeout = 30000;
45 private IDictionary<string, string> customHeaders = new Dictionary<string, string>();
46 private string userAgent = "C#/THttpClient";
49 private IWebProxy proxy = WebRequest.DefaultWebProxy;
52 public THttpClient(Uri u)
53 : this(u, Enumerable.Empty<X509Certificate>())
56 public THttpClient(Uri u, string userAgent)
57 : this(u, userAgent, Enumerable.Empty<X509Certificate>())
61 public THttpClient(Uri u, IEnumerable<X509Certificate> certificates)
64 this.certificates = (certificates ?? Enumerable.Empty<X509Certificate>()).ToArray();
66 public THttpClient(Uri u, string userAgent, IEnumerable<X509Certificate> certificates)
69 this.userAgent = userAgent;
70 this.certificates = (certificates ?? Enumerable.Empty<X509Certificate>()).ToArray();
73 public int ConnectTimeout
77 connectTimeout = value;
81 public int ReadTimeout
89 public IDictionary<string, string> CustomHeaders
98 public IWebProxy Proxy
107 public override bool IsOpen
115 public override void Open()
119 public override void Close()
121 if (inputStream != null)
126 if (outputStream != null)
128 outputStream.Close();
133 public override int Read(byte[] buf, int off, int len)
135 if (inputStream == null)
137 throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No request has been sent");
142 int ret = inputStream.Read(buf, off, len);
146 throw new TTransportException(TTransportException.ExceptionType.EndOfFile, "No more data available");
151 catch (IOException iox)
153 throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString(), iox);
157 public override void Write(byte[] buf, int off, int len)
159 outputStream.Write(buf, off, len);
163 public override void Flush()
171 outputStream = new MemoryStream();
175 private void SendRequest()
179 HttpWebRequest connection = CreateRequest();
180 connection.Headers.Add("Accept-Encoding", "gzip, deflate");
182 byte[] data = outputStream.ToArray();
183 connection.ContentLength = data.Length;
185 using (Stream requestStream = connection.GetRequestStream())
187 requestStream.Write(data, 0, data.Length);
189 // Resolve HTTP hang that can happens after successive calls by making sure
190 // that we release the response and response stream. To support this, we copy
191 // the response to a memory stream.
192 using (var response = connection.GetResponse())
194 using (var responseStream = response.GetResponseStream())
196 // Copy the response to a memory stream so that we can
197 // cleanly close the response and response stream.
198 inputStream = new MemoryStream();
199 byte[] buffer = new byte[8192]; // multiple of 4096
201 while ((bytesRead = responseStream.Read(buffer, 0, buffer.Length)) > 0)
203 inputStream.Write(buffer, 0, bytesRead);
205 inputStream.Seek(0, 0);
208 var encodings = response.Headers.GetValues("Content-Encoding");
209 if (encodings != null)
211 foreach (var encoding in encodings)
216 DecompressGZipped(ref inputStream);
219 DecompressDeflated(ref inputStream);
229 catch (IOException iox)
231 throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString(), iox);
233 catch (WebException wx)
235 throw new TTransportException(TTransportException.ExceptionType.Unknown, "Couldn't connect to server: " + wx, wx);
239 private void DecompressDeflated(ref Stream inputStream)
241 var tmp = new MemoryStream();
242 using (var decomp = new DeflateStream(inputStream, CompressionMode.Decompress))
246 inputStream.Dispose();
248 inputStream.Seek(0, 0);
251 private void DecompressGZipped(ref Stream inputStream)
253 var tmp = new MemoryStream();
254 using (var decomp = new GZipStream(inputStream, CompressionMode.Decompress))
258 inputStream.Dispose();
260 inputStream.Seek(0, 0);
263 private HttpWebRequest CreateRequest()
265 HttpWebRequest connection = (HttpWebRequest)WebRequest.Create(uri);
269 // Adding certificates through code is not supported with WP7 Silverlight
270 // see "Windows Phone 7 and Certificates_FINAL_121610.pdf"
271 connection.ClientCertificates.AddRange(certificates);
273 if (connectTimeout > 0)
275 connection.Timeout = connectTimeout;
279 connection.ReadWriteTimeout = readTimeout;
283 connection.ContentType = "application/x-thrift";
284 connection.Accept = "application/x-thrift";
285 connection.UserAgent = userAgent;
286 connection.Method = "POST";
288 connection.ProtocolVersion = HttpVersion.Version10;
291 //add custom headers here
292 foreach (KeyValuePair<string, string> item in customHeaders)
295 connection.Headers.Add(item.Key, item.Value);
297 connection.Headers[item.Key] = item.Value;
302 connection.Proxy = proxy;
308 public override IAsyncResult BeginFlush(AsyncCallback callback, object state)
310 // Extract request and reset buffer
311 var data = outputStream.ToArray();
313 //requestBuffer_ = new MemoryStream();
317 // Create connection object
318 var flushAsyncResult = new FlushAsyncResult(callback, state);
319 flushAsyncResult.Connection = CreateRequest();
321 flushAsyncResult.Data = data;
324 flushAsyncResult.Connection.BeginGetRequestStream(GetRequestStreamCallback, flushAsyncResult);
325 return flushAsyncResult;
328 catch (IOException iox)
330 throw new TTransportException(iox.ToString(), iox);
334 public override void EndFlush(IAsyncResult asyncResult)
338 var flushAsyncResult = (FlushAsyncResult)asyncResult;
340 if (!flushAsyncResult.IsCompleted)
342 var waitHandle = flushAsyncResult.AsyncWaitHandle;
343 waitHandle.WaitOne(); // blocking INFINITEly
347 if (flushAsyncResult.AsyncException != null)
349 throw flushAsyncResult.AsyncException;
354 outputStream = new MemoryStream();
359 private void GetRequestStreamCallback(IAsyncResult asynchronousResult)
361 var flushAsyncResult = (FlushAsyncResult)asynchronousResult.AsyncState;
364 var reqStream = flushAsyncResult.Connection.EndGetRequestStream(asynchronousResult);
365 reqStream.Write(flushAsyncResult.Data, 0, flushAsyncResult.Data.Length);
369 // Start the asynchronous operation to get the response
370 flushAsyncResult.Connection.BeginGetResponse(GetResponseCallback, flushAsyncResult);
372 catch (Exception exception)
374 flushAsyncResult.AsyncException = new TTransportException(exception.ToString(), exception);
375 flushAsyncResult.UpdateStatusToComplete();
376 flushAsyncResult.NotifyCallbackWhenAvailable();
380 private void GetResponseCallback(IAsyncResult asynchronousResult)
382 var flushAsyncResult = (FlushAsyncResult)asynchronousResult.AsyncState;
385 inputStream = flushAsyncResult.Connection.EndGetResponse(asynchronousResult).GetResponseStream();
387 catch (Exception exception)
389 flushAsyncResult.AsyncException = new TTransportException(exception.ToString(), exception);
391 flushAsyncResult.UpdateStatusToComplete();
392 flushAsyncResult.NotifyCallbackWhenAvailable();
395 // Based on http://msmvps.com/blogs/luisabreu/archive/2009/06/15/multithreading-implementing-the-iasyncresult-interface.aspx
396 class FlushAsyncResult : IAsyncResult
398 private volatile Boolean _isCompleted;
399 private ManualResetEvent _evt;
400 private readonly AsyncCallback _cbMethod;
401 private readonly object _state;
403 public FlushAsyncResult(AsyncCallback cbMethod, object state)
405 _cbMethod = cbMethod;
409 internal byte[] Data { get; set; }
410 internal HttpWebRequest Connection { get; set; }
411 internal TTransportException AsyncException { get; set; }
413 public object AsyncState
415 get { return _state; }
417 public WaitHandle AsyncWaitHandle
419 get { return GetEvtHandle(); }
421 public bool CompletedSynchronously
423 get { return false; }
425 public bool IsCompleted
427 get { return _isCompleted; }
429 private readonly object _locker = new object();
430 private ManualResetEvent GetEvtHandle()
436 _evt = new ManualResetEvent(false);
445 internal void UpdateStatusToComplete()
447 _isCompleted = true; //1. set _iscompleted to true
452 _evt.Set(); //2. set the event, when it exists
457 internal void NotifyCallbackWhenAvailable()
459 if (_cbMethod != null)
466 #region " IDisposable Support "
467 private bool _IsDisposed;
470 protected override void Dispose(bool disposing)
476 if (inputStream != null)
477 inputStream.Dispose();
478 if (outputStream != null)
479 outputStream.Dispose();