1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 * 19 * 20 */ 21 22 using System; 23 using System.Collections.Generic; 24 using System.IO; 25 using System.Net; 26 using System.Threading; 27 using System.Linq; 28 using System.Security.Cryptography.X509Certificates; 29 using System.IO.Compression; 30 31 namespace Thrift.Transport 32 { 33 public class THttpClient : TTransport, IDisposable 34 { 35 private readonly Uri uri; 36 private readonly X509Certificate[] certificates; 37 private Stream inputStream; 38 private MemoryStream outputStream = new MemoryStream(); 39 40 // Timeouts in milliseconds 41 private int connectTimeout = 30000; 42 43 private int readTimeout = 30000; 44 45 private IDictionary<string, string> customHeaders = new Dictionary<string, string>(); 46 47 #if !SILVERLIGHT 48 private IWebProxy proxy = WebRequest.DefaultWebProxy; 49 #endif 50 THttpClient(Uri u)51 public THttpClient(Uri u) 52 : this(u, Enumerable.Empty<X509Certificate>()) 53 { 54 } 55 THttpClient(Uri u, IEnumerable<X509Certificate> certificates)56 public THttpClient(Uri u, IEnumerable<X509Certificate> certificates) 57 { 58 uri = u; 59 this.certificates = (certificates ?? Enumerable.Empty<X509Certificate>()).ToArray(); 60 } 61 62 public int ConnectTimeout 63 { 64 set 65 { 66 connectTimeout = value; 67 } 68 } 69 70 public int ReadTimeout 71 { 72 set 73 { 74 readTimeout = value; 75 } 76 } 77 78 public IDictionary<string, string> CustomHeaders 79 { 80 get 81 { 82 return customHeaders; 83 } 84 } 85 86 #if !SILVERLIGHT 87 public IWebProxy Proxy 88 { 89 set 90 { 91 proxy = value; 92 } 93 } 94 #endif 95 96 public override bool IsOpen 97 { 98 get 99 { 100 return true; 101 } 102 } 103 Open()104 public override void Open() 105 { 106 } 107 Close()108 public override void Close() 109 { 110 if (inputStream != null) 111 { 112 inputStream.Close(); 113 inputStream = null; 114 } 115 if (outputStream != null) 116 { 117 outputStream.Close(); 118 outputStream = null; 119 } 120 } 121 Read(byte[] buf, int off, int len)122 public override int Read(byte[] buf, int off, int len) 123 { 124 if (inputStream == null) 125 { 126 throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No request has been sent"); 127 } 128 129 try 130 { 131 int ret = inputStream.Read(buf, off, len); 132 133 if (ret == -1) 134 { 135 throw new TTransportException(TTransportException.ExceptionType.EndOfFile, "No more data available"); 136 } 137 138 return ret; 139 } 140 catch (IOException iox) 141 { 142 throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString(), iox); 143 } 144 } 145 Write(byte[] buf, int off, int len)146 public override void Write(byte[] buf, int off, int len) 147 { 148 outputStream.Write(buf, off, len); 149 } 150 151 #if !SILVERLIGHT Flush()152 public override void Flush() 153 { 154 try 155 { 156 SendRequest(); 157 } 158 finally 159 { 160 outputStream = new MemoryStream(); 161 } 162 } 163 SendRequest()164 private void SendRequest() 165 { 166 try 167 { 168 HttpWebRequest connection = CreateRequest(); 169 connection.Headers.Add("Accept-Encoding", "gzip, deflate"); 170 171 byte[] data = outputStream.ToArray(); 172 connection.ContentLength = data.Length; 173 174 using (Stream requestStream = connection.GetRequestStream()) 175 { 176 requestStream.Write(data, 0, data.Length); 177 178 // Resolve HTTP hang that can happens after successive calls by making sure 179 // that we release the response and response stream. To support this, we copy 180 // the response to a memory stream. 181 using (var response = connection.GetResponse()) 182 { 183 using (var responseStream = response.GetResponseStream()) 184 { 185 // Copy the response to a memory stream so that we can 186 // cleanly close the response and response stream. 187 inputStream = new MemoryStream(); 188 byte[] buffer = new byte[8192]; // multiple of 4096 189 int bytesRead; 190 while ((bytesRead = responseStream.Read(buffer, 0, buffer.Length)) > 0) 191 { 192 inputStream.Write(buffer, 0, bytesRead); 193 } 194 inputStream.Seek(0, 0); 195 } 196 197 var encodings = response.Headers.GetValues("Content-Encoding"); 198 if (encodings != null) 199 { 200 foreach (var encoding in encodings) 201 { 202 switch (encoding) 203 { 204 case "gzip": 205 DecompressGZipped(ref inputStream); 206 break; 207 case "deflate": 208 DecompressDeflated(ref inputStream); 209 break; 210 default: 211 break; 212 } 213 } 214 } 215 } 216 } 217 } 218 catch (IOException iox) 219 { 220 throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString(), iox); 221 } 222 catch (WebException wx) 223 { 224 throw new TTransportException(TTransportException.ExceptionType.Unknown, "Couldn't connect to server: " + wx, wx); 225 } 226 } 227 DecompressDeflated(ref Stream inputStream)228 private void DecompressDeflated(ref Stream inputStream) 229 { 230 var tmp = new MemoryStream(); 231 using (var decomp = new DeflateStream(inputStream, CompressionMode.Decompress)) 232 { 233 decomp.CopyTo(tmp); 234 } 235 inputStream.Dispose(); 236 inputStream = tmp; 237 inputStream.Seek(0, 0); 238 } 239 DecompressGZipped(ref Stream inputStream)240 private void DecompressGZipped(ref Stream inputStream) 241 { 242 var tmp = new MemoryStream(); 243 using (var decomp = new GZipStream(inputStream, CompressionMode.Decompress)) 244 { 245 decomp.CopyTo(tmp); 246 } 247 inputStream.Dispose(); 248 inputStream = tmp; 249 inputStream.Seek(0, 0); 250 } 251 #endif CreateRequest()252 private HttpWebRequest CreateRequest() 253 { 254 HttpWebRequest connection = (HttpWebRequest)WebRequest.Create(uri); 255 256 257 #if !SILVERLIGHT 258 // Adding certificates through code is not supported with WP7 Silverlight 259 // see "Windows Phone 7 and Certificates_FINAL_121610.pdf" 260 connection.ClientCertificates.AddRange(certificates); 261 262 if (connectTimeout > 0) 263 { 264 connection.Timeout = connectTimeout; 265 } 266 if (readTimeout > 0) 267 { 268 connection.ReadWriteTimeout = readTimeout; 269 } 270 #endif 271 // Make the request 272 connection.ContentType = "application/x-thrift"; 273 connection.Accept = "application/x-thrift"; 274 connection.UserAgent = "C#/THttpClient"; 275 connection.Method = "POST"; 276 #if !SILVERLIGHT 277 connection.ProtocolVersion = HttpVersion.Version10; 278 #endif 279 280 //add custom headers here 281 foreach (KeyValuePair<string, string> item in customHeaders) 282 { 283 #if !SILVERLIGHT 284 connection.Headers.Add(item.Key, item.Value); 285 #else 286 connection.Headers[item.Key] = item.Value; 287 #endif 288 } 289 290 #if !SILVERLIGHT 291 connection.Proxy = proxy; 292 #endif 293 294 return connection; 295 } 296 BeginFlush(AsyncCallback callback, object state)297 public override IAsyncResult BeginFlush(AsyncCallback callback, object state) 298 { 299 // Extract request and reset buffer 300 var data = outputStream.ToArray(); 301 302 //requestBuffer_ = new MemoryStream(); 303 304 try 305 { 306 // Create connection object 307 var flushAsyncResult = new FlushAsyncResult(callback, state); 308 flushAsyncResult.Connection = CreateRequest(); 309 310 flushAsyncResult.Data = data; 311 312 313 flushAsyncResult.Connection.BeginGetRequestStream(GetRequestStreamCallback, flushAsyncResult); 314 return flushAsyncResult; 315 316 } 317 catch (IOException iox) 318 { 319 throw new TTransportException(iox.ToString(), iox); 320 } 321 } 322 EndFlush(IAsyncResult asyncResult)323 public override void EndFlush(IAsyncResult asyncResult) 324 { 325 try 326 { 327 var flushAsyncResult = (FlushAsyncResult)asyncResult; 328 329 if (!flushAsyncResult.IsCompleted) 330 { 331 var waitHandle = flushAsyncResult.AsyncWaitHandle; 332 waitHandle.WaitOne(); // blocking INFINITEly 333 waitHandle.Close(); 334 } 335 336 if (flushAsyncResult.AsyncException != null) 337 { 338 throw flushAsyncResult.AsyncException; 339 } 340 } 341 finally 342 { 343 outputStream = new MemoryStream(); 344 } 345 346 } 347 GetRequestStreamCallback(IAsyncResult asynchronousResult)348 private void GetRequestStreamCallback(IAsyncResult asynchronousResult) 349 { 350 var flushAsyncResult = (FlushAsyncResult)asynchronousResult.AsyncState; 351 try 352 { 353 var reqStream = flushAsyncResult.Connection.EndGetRequestStream(asynchronousResult); 354 reqStream.Write(flushAsyncResult.Data, 0, flushAsyncResult.Data.Length); 355 reqStream.Flush(); 356 reqStream.Close(); 357 358 // Start the asynchronous operation to get the response 359 flushAsyncResult.Connection.BeginGetResponse(GetResponseCallback, flushAsyncResult); 360 } 361 catch (Exception exception) 362 { 363 flushAsyncResult.AsyncException = new TTransportException(exception.ToString(), exception); 364 flushAsyncResult.UpdateStatusToComplete(); 365 flushAsyncResult.NotifyCallbackWhenAvailable(); 366 } 367 } 368 GetResponseCallback(IAsyncResult asynchronousResult)369 private void GetResponseCallback(IAsyncResult asynchronousResult) 370 { 371 var flushAsyncResult = (FlushAsyncResult)asynchronousResult.AsyncState; 372 try 373 { 374 inputStream = flushAsyncResult.Connection.EndGetResponse(asynchronousResult).GetResponseStream(); 375 } 376 catch (Exception exception) 377 { 378 flushAsyncResult.AsyncException = new TTransportException(exception.ToString(), exception); 379 } 380 flushAsyncResult.UpdateStatusToComplete(); 381 flushAsyncResult.NotifyCallbackWhenAvailable(); 382 } 383 384 // Based on http://msmvps.com/blogs/luisabreu/archive/2009/06/15/multithreading-implementing-the-iasyncresult-interface.aspx 385 class FlushAsyncResult : IAsyncResult 386 { 387 private volatile Boolean _isCompleted; 388 private ManualResetEvent _evt; 389 private readonly AsyncCallback _cbMethod; 390 private readonly object _state; 391 FlushAsyncResult(AsyncCallback cbMethod, object state)392 public FlushAsyncResult(AsyncCallback cbMethod, object state) 393 { 394 _cbMethod = cbMethod; 395 _state = state; 396 } 397 398 internal byte[] Data { get; set; } 399 internal HttpWebRequest Connection { get; set; } 400 internal TTransportException AsyncException { get; set; } 401 402 public object AsyncState 403 { 404 get { return _state; } 405 } 406 public WaitHandle AsyncWaitHandle 407 { 408 get { return GetEvtHandle(); } 409 } 410 public bool CompletedSynchronously 411 { 412 get { return false; } 413 } 414 public bool IsCompleted 415 { 416 get { return _isCompleted; } 417 } 418 private readonly object _locker = new object(); GetEvtHandle()419 private ManualResetEvent GetEvtHandle() 420 { 421 lock (_locker) 422 { 423 if (_evt == null) 424 { 425 _evt = new ManualResetEvent(false); 426 } 427 if (_isCompleted) 428 { 429 _evt.Set(); 430 } 431 } 432 return _evt; 433 } UpdateStatusToComplete()434 internal void UpdateStatusToComplete() 435 { 436 _isCompleted = true; //1. set _iscompleted to true 437 lock (_locker) 438 { 439 if (_evt != null) 440 { 441 _evt.Set(); //2. set the event, when it exists 442 } 443 } 444 } 445 NotifyCallbackWhenAvailable()446 internal void NotifyCallbackWhenAvailable() 447 { 448 if (_cbMethod != null) 449 { 450 _cbMethod(this); 451 } 452 } 453 } 454 455 #region " IDisposable Support " 456 private bool _IsDisposed; 457 458 // IDisposable Dispose(bool disposing)459 protected override void Dispose(bool disposing) 460 { 461 if (!_IsDisposed) 462 { 463 if (disposing) 464 { 465 if (inputStream != null) 466 inputStream.Dispose(); 467 if (outputStream != null) 468 outputStream.Dispose(); 469 } 470 } 471 _IsDisposed = true; 472 } 473 #endregion 474 } 475 } 476