1 // Licensed to the Apache Software Foundation(ASF) under one 2 // or more contributor license agreements.See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership.The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 using System; 19 using System.Collections.Generic; 20 using System.IO; 21 using System.Linq; 22 using System.Net.Http; 23 using System.Net.Http.Headers; 24 using System.Security.Cryptography.X509Certificates; 25 using System.Threading; 26 using System.Threading.Tasks; 27 28 namespace Thrift.Transport.Client 29 { 30 // ReSharper disable once InconsistentNaming 31 public class THttpTransport : TEndpointTransport 32 { 33 private readonly X509Certificate[] _certificates; 34 private readonly Uri _uri; 35 36 private int _connectTimeout = 30000; // Timeouts in milliseconds 37 private HttpClient _httpClient; 38 private Stream _inputStream; 39 private MemoryStream _outputStream = new MemoryStream(); 40 private bool _isDisposed; 41 THttpTransport(Uri uri, TConfiguration config, IDictionary<string, string> customRequestHeaders = null, string userAgent = null)42 public THttpTransport(Uri uri, TConfiguration config, IDictionary<string, string> customRequestHeaders = null, string userAgent = null) 43 : this(uri, config, Enumerable.Empty<X509Certificate>(), customRequestHeaders, userAgent) 44 { 45 } 46 THttpTransport(Uri uri, TConfiguration config, IEnumerable<X509Certificate> certificates, IDictionary<string, string> customRequestHeaders, string userAgent = null)47 public THttpTransport(Uri uri, TConfiguration config, IEnumerable<X509Certificate> certificates, 48 IDictionary<string, string> customRequestHeaders, string userAgent = null) 49 : base(config) 50 { 51 _uri = uri; 52 _certificates = (certificates ?? Enumerable.Empty<X509Certificate>()).ToArray(); 53 54 if (!string.IsNullOrEmpty(userAgent)) 55 UserAgent = userAgent; 56 57 // due to current bug with performance of Dispose in netcore https://github.com/dotnet/corefx/issues/8809 58 // this can be switched to default way (create client->use->dispose per flush) later 59 _httpClient = CreateClient(customRequestHeaders); 60 ConfigureClient(_httpClient); 61 } 62 63 /// <summary> 64 /// Constructor that takes a <c>HttpClient</c> instance to support using <c>IHttpClientFactory</c>. 65 /// </summary> 66 /// <remarks>As the <c>HttpMessageHandler</c> of the client must be configured at the time of creation, it 67 /// is assumed that the consumer has already added any certificates and configured decompression methods. The 68 /// consumer can use the <c>CreateHttpClientHandler</c> method to get a handler with these set.</remarks> 69 /// <param name="httpClient">Client configured with the desired message handler, user agent, and URI if not 70 /// specified in the <c>uri</c> parameter. A default user agent will be used if not set.</param> 71 /// <param name="config">Thrift configuration object</param> 72 /// <param name="uri">Optional URI to use for requests, if not specified the base address of <c>httpClient</c> 73 /// is used.</param> THttpTransport(HttpClient httpClient, TConfiguration config, Uri uri = null)74 public THttpTransport(HttpClient httpClient, TConfiguration config, Uri uri = null) 75 : base(config) 76 { 77 _httpClient = httpClient; 78 79 _uri = uri ?? httpClient.BaseAddress; 80 httpClient.BaseAddress = _uri; 81 82 var userAgent = _httpClient.DefaultRequestHeaders.UserAgent.ToString(); 83 if (!string.IsNullOrEmpty(userAgent)) 84 UserAgent = userAgent; 85 86 ConfigureClient(_httpClient); 87 } 88 89 // According to RFC 2616 section 3.8, the "User-Agent" header may not carry a version number 90 public readonly string UserAgent = "Thrift netstd THttpClient"; 91 92 public override bool IsOpen => true; 93 94 public HttpRequestHeaders RequestHeaders => _httpClient.DefaultRequestHeaders; 95 96 public MediaTypeHeaderValue ContentType { get; set; } 97 OpenAsync(CancellationToken cancellationToken)98 public override Task OpenAsync(CancellationToken cancellationToken) 99 { 100 cancellationToken.ThrowIfCancellationRequested(); 101 return Task.CompletedTask; 102 } 103 Close()104 public override void Close() 105 { 106 if (_inputStream != null) 107 { 108 _inputStream.Dispose(); 109 _inputStream = null; 110 } 111 112 if (_outputStream != null) 113 { 114 _outputStream.Dispose(); 115 _outputStream = null; 116 } 117 118 if (_httpClient != null) 119 { 120 _httpClient.Dispose(); 121 _httpClient = null; 122 } 123 } 124 ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)125 public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) 126 { 127 cancellationToken.ThrowIfCancellationRequested(); 128 129 if (_inputStream == null) 130 throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No request has been sent"); 131 132 CheckReadBytesAvailable(length); 133 134 try 135 { 136 #if NETSTANDARD2_1 137 var ret = await _inputStream.ReadAsync(new Memory<byte>(buffer, offset, length), cancellationToken); 138 #else 139 var ret = await _inputStream.ReadAsync(buffer, offset, length, cancellationToken); 140 #endif 141 if (ret == -1) 142 { 143 throw new TTransportException(TTransportException.ExceptionType.EndOfFile, "No more data available"); 144 } 145 146 CountConsumedMessageBytes(ret); 147 return ret; 148 } 149 catch (IOException iox) 150 { 151 throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString()); 152 } 153 } 154 WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)155 public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) 156 { 157 cancellationToken.ThrowIfCancellationRequested(); 158 159 await _outputStream.WriteAsync(buffer, offset, length, cancellationToken); 160 } 161 162 /// <summary> 163 /// Get a client handler configured with recommended properties to use with the <c>HttpClient</c> constructor 164 /// and an <c>IHttpClientFactory</c>. 165 /// </summary> 166 /// <param name="certificates">An optional array of client certificates to associate with the handler.</param> 167 /// <returns> 168 /// A client handler with deflate and gZip compression-decompression algorithms and any client 169 /// certificates passed in via <c>certificates</c>. 170 /// </returns> CreateHttpClientHandler(X509Certificate[] certificates = null)171 public virtual HttpClientHandler CreateHttpClientHandler(X509Certificate[] certificates = null) 172 { 173 var handler = new HttpClientHandler(); 174 if (certificates != null) 175 handler.ClientCertificates.AddRange(certificates); 176 handler.AutomaticDecompression = System.Net.DecompressionMethods.Deflate | System.Net.DecompressionMethods.GZip; 177 return handler; 178 } 179 CreateClient(IDictionary<string, string> customRequestHeaders)180 private HttpClient CreateClient(IDictionary<string, string> customRequestHeaders) 181 { 182 var handler = CreateHttpClientHandler(_certificates); 183 var httpClient = new HttpClient(handler); 184 185 186 if (customRequestHeaders != null) 187 { 188 foreach (var item in customRequestHeaders) 189 { 190 httpClient.DefaultRequestHeaders.Add(item.Key, item.Value); 191 } 192 } 193 194 return httpClient; 195 } 196 ConfigureClient(HttpClient httpClient)197 private void ConfigureClient(HttpClient httpClient) 198 { 199 if (_connectTimeout > 0) 200 { 201 httpClient.Timeout = TimeSpan.FromMilliseconds(_connectTimeout); 202 } 203 204 httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/x-thrift")); 205 206 // Clear any user agent values to avoid drift with the field value 207 httpClient.DefaultRequestHeaders.UserAgent.Clear(); 208 httpClient.DefaultRequestHeaders.UserAgent.TryParseAdd(UserAgent); 209 210 httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("deflate")); 211 httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("gzip")); 212 } 213 FlushAsync(CancellationToken cancellationToken)214 public override async Task FlushAsync(CancellationToken cancellationToken) 215 { 216 try 217 { 218 _outputStream.Seek(0, SeekOrigin.Begin); 219 220 using (var contentStream = new StreamContent(_outputStream)) 221 { 222 contentStream.Headers.ContentType = ContentType ?? new MediaTypeHeaderValue(@"application/x-thrift"); 223 224 var response = (await _httpClient.PostAsync(_uri, contentStream, cancellationToken)).EnsureSuccessStatusCode(); 225 226 _inputStream?.Dispose(); 227 _inputStream = await response.Content.ReadAsStreamAsync(); 228 if (_inputStream.CanSeek) 229 { 230 _inputStream.Seek(0, SeekOrigin.Begin); 231 } 232 } 233 } 234 catch (IOException iox) 235 { 236 throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString()); 237 } 238 catch (HttpRequestException wx) 239 { 240 throw new TTransportException(TTransportException.ExceptionType.Unknown, 241 "Couldn't connect to server: " + wx); 242 } 243 catch (Exception ex) 244 { 245 throw new TTransportException(TTransportException.ExceptionType.Unknown, ex.Message); 246 } 247 finally 248 { 249 _outputStream = new MemoryStream(); 250 ResetConsumedMessageSize(); 251 } 252 } 253 254 255 // IDisposable Dispose(bool disposing)256 protected override void Dispose(bool disposing) 257 { 258 if (!_isDisposed) 259 { 260 if (disposing) 261 { 262 _inputStream?.Dispose(); 263 _outputStream?.Dispose(); 264 _httpClient?.Dispose(); 265 } 266 } 267 _isDisposed = true; 268 } 269 } 270 } 271