1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  *
19  *
20  */
21 
22 using System;
23 using System.Collections.Generic;
24 using System.IO;
25 using System.Net;
26 using System.Threading;
27 using System.Linq;
28 using System.Security.Cryptography.X509Certificates;
29 using System.IO.Compression;
30 
31 namespace Thrift.Transport
32 {
33     public class THttpClient : TTransport, IDisposable
34     {
35         private readonly Uri uri;
36         private readonly X509Certificate[] certificates;
37         private Stream inputStream;
38         private MemoryStream outputStream = new MemoryStream();
39 
40         // Timeouts in milliseconds
41         private int connectTimeout = 30000;
42 
43         private int readTimeout = 30000;
44 
45         private IDictionary<string, string> customHeaders = new Dictionary<string, string>();
46 
47 #if !SILVERLIGHT
48         private IWebProxy proxy = WebRequest.DefaultWebProxy;
49 #endif
50 
THttpClient(Uri u)51         public THttpClient(Uri u)
52             : this(u, Enumerable.Empty<X509Certificate>())
53         {
54         }
55 
THttpClient(Uri u, IEnumerable<X509Certificate> certificates)56         public THttpClient(Uri u, IEnumerable<X509Certificate> certificates)
57         {
58             uri = u;
59             this.certificates = (certificates ?? Enumerable.Empty<X509Certificate>()).ToArray();
60         }
61 
62         public int ConnectTimeout
63         {
64             set
65             {
66                 connectTimeout = value;
67             }
68         }
69 
70         public int ReadTimeout
71         {
72             set
73             {
74                 readTimeout = value;
75             }
76         }
77 
78         public IDictionary<string, string> CustomHeaders
79         {
80             get
81             {
82                 return customHeaders;
83             }
84         }
85 
86 #if !SILVERLIGHT
87         public IWebProxy Proxy
88         {
89             set
90             {
91                 proxy = value;
92             }
93         }
94 #endif
95 
96         public override bool IsOpen
97         {
98             get
99             {
100                 return true;
101             }
102         }
103 
Open()104         public override void Open()
105         {
106         }
107 
Close()108         public override void Close()
109         {
110             if (inputStream != null)
111             {
112                 inputStream.Close();
113                 inputStream = null;
114             }
115             if (outputStream != null)
116             {
117                 outputStream.Close();
118                 outputStream = null;
119             }
120         }
121 
Read(byte[] buf, int off, int len)122         public override int Read(byte[] buf, int off, int len)
123         {
124             if (inputStream == null)
125             {
126                 throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No request has been sent");
127             }
128 
129             try
130             {
131                 int ret = inputStream.Read(buf, off, len);
132 
133                 if (ret == -1)
134                 {
135                     throw new TTransportException(TTransportException.ExceptionType.EndOfFile, "No more data available");
136                 }
137 
138                 return ret;
139             }
140             catch (IOException iox)
141             {
142                 throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString(), iox);
143             }
144         }
145 
Write(byte[] buf, int off, int len)146         public override void Write(byte[] buf, int off, int len)
147         {
148             outputStream.Write(buf, off, len);
149         }
150 
151 #if !SILVERLIGHT
Flush()152         public override void Flush()
153         {
154             try
155             {
156                 SendRequest();
157             }
158             finally
159             {
160                 outputStream = new MemoryStream();
161             }
162         }
163 
SendRequest()164         private void SendRequest()
165         {
166             try
167             {
168                 HttpWebRequest connection = CreateRequest();
169                 connection.Headers.Add("Accept-Encoding", "gzip, deflate");
170 
171                 byte[] data = outputStream.ToArray();
172                 connection.ContentLength = data.Length;
173 
174                 using (Stream requestStream = connection.GetRequestStream())
175                 {
176                     requestStream.Write(data, 0, data.Length);
177 
178                     // Resolve HTTP hang that can happens after successive calls by making sure
179                     // that we release the response and response stream. To support this, we copy
180                     // the response to a memory stream.
181                     using (var response = connection.GetResponse())
182                     {
183                         using (var responseStream = response.GetResponseStream())
184                         {
185                             // Copy the response to a memory stream so that we can
186                             // cleanly close the response and response stream.
187                             inputStream = new MemoryStream();
188                             byte[] buffer = new byte[8192];  // multiple of 4096
189                             int bytesRead;
190                             while ((bytesRead = responseStream.Read(buffer, 0, buffer.Length)) > 0)
191                             {
192                                 inputStream.Write(buffer, 0, bytesRead);
193                             }
194                             inputStream.Seek(0, 0);
195                         }
196 
197                         var encodings = response.Headers.GetValues("Content-Encoding");
198                         if (encodings != null)
199                         {
200                             foreach (var encoding in encodings)
201                             {
202                                 switch (encoding)
203                                 {
204                                     case "gzip":
205                                         DecompressGZipped(ref inputStream);
206                                         break;
207                                     case "deflate":
208                                         DecompressDeflated(ref inputStream);
209                                         break;
210                                     default:
211                                         break;
212                                 }
213                             }
214                         }
215                     }
216                 }
217             }
218             catch (IOException iox)
219             {
220                 throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString(), iox);
221             }
222             catch (WebException wx)
223             {
224                 throw new TTransportException(TTransportException.ExceptionType.Unknown, "Couldn't connect to server: " + wx, wx);
225             }
226         }
227 
DecompressDeflated(ref Stream inputStream)228         private void DecompressDeflated(ref Stream inputStream)
229         {
230             var tmp = new MemoryStream();
231             using (var decomp = new DeflateStream(inputStream, CompressionMode.Decompress))
232             {
233                 decomp.CopyTo(tmp);
234             }
235             inputStream.Dispose();
236             inputStream = tmp;
237             inputStream.Seek(0, 0);
238         }
239 
DecompressGZipped(ref Stream inputStream)240         private void DecompressGZipped(ref Stream inputStream)
241         {
242             var tmp = new MemoryStream();
243             using (var decomp = new GZipStream(inputStream, CompressionMode.Decompress))
244             {
245                 decomp.CopyTo(tmp);
246             }
247             inputStream.Dispose();
248             inputStream = tmp;
249             inputStream.Seek(0, 0);
250         }
251 #endif
CreateRequest()252         private HttpWebRequest CreateRequest()
253         {
254             HttpWebRequest connection = (HttpWebRequest)WebRequest.Create(uri);
255 
256 
257 #if !SILVERLIGHT
258             // Adding certificates through code is not supported with WP7 Silverlight
259             // see "Windows Phone 7 and Certificates_FINAL_121610.pdf"
260             connection.ClientCertificates.AddRange(certificates);
261 
262             if (connectTimeout > 0)
263             {
264                 connection.Timeout = connectTimeout;
265             }
266             if (readTimeout > 0)
267             {
268                 connection.ReadWriteTimeout = readTimeout;
269             }
270 #endif
271             // Make the request
272             connection.ContentType = "application/x-thrift";
273             connection.Accept = "application/x-thrift";
274             connection.UserAgent = "C#/THttpClient";
275             connection.Method = "POST";
276 #if !SILVERLIGHT
277             connection.ProtocolVersion = HttpVersion.Version10;
278 #endif
279 
280             //add custom headers here
281             foreach (KeyValuePair<string, string> item in customHeaders)
282             {
283 #if !SILVERLIGHT
284                 connection.Headers.Add(item.Key, item.Value);
285 #else
286                 connection.Headers[item.Key] = item.Value;
287 #endif
288             }
289 
290 #if !SILVERLIGHT
291             connection.Proxy = proxy;
292 #endif
293 
294             return connection;
295         }
296 
BeginFlush(AsyncCallback callback, object state)297         public override IAsyncResult BeginFlush(AsyncCallback callback, object state)
298         {
299             // Extract request and reset buffer
300             var data = outputStream.ToArray();
301 
302             //requestBuffer_ = new MemoryStream();
303 
304             try
305             {
306                 // Create connection object
307                 var flushAsyncResult = new FlushAsyncResult(callback, state);
308                 flushAsyncResult.Connection = CreateRequest();
309 
310                 flushAsyncResult.Data = data;
311 
312 
313                 flushAsyncResult.Connection.BeginGetRequestStream(GetRequestStreamCallback, flushAsyncResult);
314                 return flushAsyncResult;
315 
316             }
317             catch (IOException iox)
318             {
319                 throw new TTransportException(iox.ToString(), iox);
320             }
321         }
322 
EndFlush(IAsyncResult asyncResult)323         public override void EndFlush(IAsyncResult asyncResult)
324         {
325             try
326             {
327                 var flushAsyncResult = (FlushAsyncResult)asyncResult;
328 
329                 if (!flushAsyncResult.IsCompleted)
330                 {
331                     var waitHandle = flushAsyncResult.AsyncWaitHandle;
332                     waitHandle.WaitOne();  // blocking INFINITEly
333                     waitHandle.Close();
334                 }
335 
336                 if (flushAsyncResult.AsyncException != null)
337                 {
338                     throw flushAsyncResult.AsyncException;
339                 }
340             }
341             finally
342             {
343                 outputStream = new MemoryStream();
344             }
345 
346         }
347 
GetRequestStreamCallback(IAsyncResult asynchronousResult)348         private void GetRequestStreamCallback(IAsyncResult asynchronousResult)
349         {
350             var flushAsyncResult = (FlushAsyncResult)asynchronousResult.AsyncState;
351             try
352             {
353                 var reqStream = flushAsyncResult.Connection.EndGetRequestStream(asynchronousResult);
354                 reqStream.Write(flushAsyncResult.Data, 0, flushAsyncResult.Data.Length);
355                 reqStream.Flush();
356                 reqStream.Close();
357 
358                 // Start the asynchronous operation to get the response
359                 flushAsyncResult.Connection.BeginGetResponse(GetResponseCallback, flushAsyncResult);
360             }
361             catch (Exception exception)
362             {
363                 flushAsyncResult.AsyncException = new TTransportException(exception.ToString(), exception);
364                 flushAsyncResult.UpdateStatusToComplete();
365                 flushAsyncResult.NotifyCallbackWhenAvailable();
366             }
367         }
368 
GetResponseCallback(IAsyncResult asynchronousResult)369         private void GetResponseCallback(IAsyncResult asynchronousResult)
370         {
371             var flushAsyncResult = (FlushAsyncResult)asynchronousResult.AsyncState;
372             try
373             {
374                 inputStream = flushAsyncResult.Connection.EndGetResponse(asynchronousResult).GetResponseStream();
375             }
376             catch (Exception exception)
377             {
378                 flushAsyncResult.AsyncException = new TTransportException(exception.ToString(), exception);
379             }
380             flushAsyncResult.UpdateStatusToComplete();
381             flushAsyncResult.NotifyCallbackWhenAvailable();
382         }
383 
384         // Based on http://msmvps.com/blogs/luisabreu/archive/2009/06/15/multithreading-implementing-the-iasyncresult-interface.aspx
385         class FlushAsyncResult : IAsyncResult
386         {
387             private volatile Boolean _isCompleted;
388             private ManualResetEvent _evt;
389             private readonly AsyncCallback _cbMethod;
390             private readonly object _state;
391 
FlushAsyncResult(AsyncCallback cbMethod, object state)392             public FlushAsyncResult(AsyncCallback cbMethod, object state)
393             {
394                 _cbMethod = cbMethod;
395                 _state = state;
396             }
397 
398             internal byte[] Data { get; set; }
399             internal HttpWebRequest Connection { get; set; }
400             internal TTransportException AsyncException { get; set; }
401 
402             public object AsyncState
403             {
404                 get { return _state; }
405             }
406             public WaitHandle AsyncWaitHandle
407             {
408                 get { return GetEvtHandle(); }
409             }
410             public bool CompletedSynchronously
411             {
412                 get { return false; }
413             }
414             public bool IsCompleted
415             {
416                 get { return _isCompleted; }
417             }
418             private readonly object _locker = new object();
GetEvtHandle()419             private ManualResetEvent GetEvtHandle()
420             {
421                 lock (_locker)
422                 {
423                     if (_evt == null)
424                     {
425                         _evt = new ManualResetEvent(false);
426                     }
427                     if (_isCompleted)
428                     {
429                         _evt.Set();
430                     }
431                 }
432                 return _evt;
433             }
UpdateStatusToComplete()434             internal void UpdateStatusToComplete()
435             {
436                 _isCompleted = true; //1. set _iscompleted to true
437                 lock (_locker)
438                 {
439                     if (_evt != null)
440                     {
441                         _evt.Set(); //2. set the event, when it exists
442                     }
443                 }
444             }
445 
NotifyCallbackWhenAvailable()446             internal void NotifyCallbackWhenAvailable()
447             {
448                 if (_cbMethod != null)
449                 {
450                     _cbMethod(this);
451                 }
452             }
453         }
454 
455         #region " IDisposable Support "
456         private bool _IsDisposed;
457 
458         // IDisposable
Dispose(bool disposing)459         protected override void Dispose(bool disposing)
460         {
461             if (!_IsDisposed)
462             {
463                 if (disposing)
464                 {
465                     if (inputStream != null)
466                         inputStream.Dispose();
467                     if (outputStream != null)
468                         outputStream.Dispose();
469                 }
470             }
471             _IsDisposed = true;
472         }
473         #endregion
474     }
475 }
476