1 // Licensed to the Apache Software Foundation(ASF) under one 2 // or more contributor license agreements.See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership.The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 using System; 19 using System.Collections.Generic; 20 using System.IO; 21 using System.Linq; 22 using System.Net.Http; 23 using System.Net.Http.Headers; 24 using System.Security.Cryptography.X509Certificates; 25 using System.Threading; 26 using System.Threading.Tasks; 27 28 namespace Thrift.Transport.Client 29 { 30 // ReSharper disable once InconsistentNaming 31 public class THttpTransport : TTransport 32 { 33 private readonly X509Certificate[] _certificates; 34 private readonly Uri _uri; 35 36 private int _connectTimeout = 30000; // Timeouts in milliseconds 37 private HttpClient _httpClient; 38 private Stream _inputStream; 39 private MemoryStream _outputStream = new MemoryStream(); 40 private bool _isDisposed; 41 THttpTransport(Uri uri, IDictionary<string, string> customRequestHeaders = null, string userAgent = null)42 public THttpTransport(Uri uri, IDictionary<string, string> customRequestHeaders = null, string userAgent = null) 43 : this(uri, Enumerable.Empty<X509Certificate>(), customRequestHeaders, userAgent) 44 { 45 } 46 THttpTransport(Uri uri, IEnumerable<X509Certificate> certificates, IDictionary<string, string> customRequestHeaders, string userAgent = null)47 public THttpTransport(Uri uri, IEnumerable<X509Certificate> certificates, 48 IDictionary<string, string> customRequestHeaders, string userAgent = null) 49 { 50 _uri = uri; 51 _certificates = (certificates ?? Enumerable.Empty<X509Certificate>()).ToArray(); 52 53 if (!string.IsNullOrEmpty(userAgent)) 54 UserAgent = userAgent; 55 56 // due to current bug with performance of Dispose in netcore https://github.com/dotnet/corefx/issues/8809 57 // this can be switched to default way (create client->use->dispose per flush) later 58 _httpClient = CreateClient(customRequestHeaders); 59 } 60 61 // According to RFC 2616 section 3.8, the "User-Agent" header may not carry a version number 62 public readonly string UserAgent = "Thrift netstd THttpClient"; 63 64 public override bool IsOpen => true; 65 66 public HttpRequestHeaders RequestHeaders => _httpClient.DefaultRequestHeaders; 67 68 public MediaTypeHeaderValue ContentType { get; set; } 69 OpenAsync(CancellationToken cancellationToken)70 public override async Task OpenAsync(CancellationToken cancellationToken) 71 { 72 if (cancellationToken.IsCancellationRequested) 73 { 74 await Task.FromCanceled(cancellationToken); 75 } 76 } 77 Close()78 public override void Close() 79 { 80 if (_inputStream != null) 81 { 82 _inputStream.Dispose(); 83 _inputStream = null; 84 } 85 86 if (_outputStream != null) 87 { 88 _outputStream.Dispose(); 89 _outputStream = null; 90 } 91 92 if (_httpClient != null) 93 { 94 _httpClient.Dispose(); 95 _httpClient = null; 96 } 97 } 98 ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)99 public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) 100 { 101 if (cancellationToken.IsCancellationRequested) 102 { 103 return await Task.FromCanceled<int>(cancellationToken); 104 } 105 106 if (_inputStream == null) 107 { 108 throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No request has been sent"); 109 } 110 111 try 112 { 113 var ret = await _inputStream.ReadAsync(buffer, offset, length, cancellationToken); 114 115 if (ret == -1) 116 { 117 throw new TTransportException(TTransportException.ExceptionType.EndOfFile, "No more data available"); 118 } 119 120 return ret; 121 } 122 catch (IOException iox) 123 { 124 throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString()); 125 } 126 } 127 WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)128 public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) 129 { 130 if (cancellationToken.IsCancellationRequested) 131 { 132 await Task.FromCanceled(cancellationToken); 133 } 134 135 await _outputStream.WriteAsync(buffer, offset, length, cancellationToken); 136 } 137 CreateClient(IDictionary<string, string> customRequestHeaders)138 private HttpClient CreateClient(IDictionary<string, string> customRequestHeaders) 139 { 140 var handler = new HttpClientHandler(); 141 handler.ClientCertificates.AddRange(_certificates); 142 handler.AutomaticDecompression = System.Net.DecompressionMethods.Deflate | System.Net.DecompressionMethods.GZip; 143 144 var httpClient = new HttpClient(handler); 145 146 if (_connectTimeout > 0) 147 { 148 httpClient.Timeout = TimeSpan.FromMilliseconds(_connectTimeout); 149 } 150 151 httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/x-thrift")); 152 httpClient.DefaultRequestHeaders.UserAgent.TryParseAdd(UserAgent); 153 154 httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("deflate")); 155 httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("gzip")); 156 157 if (customRequestHeaders != null) 158 { 159 foreach (var item in customRequestHeaders) 160 { 161 httpClient.DefaultRequestHeaders.Add(item.Key, item.Value); 162 } 163 } 164 165 return httpClient; 166 } 167 FlushAsync(CancellationToken cancellationToken)168 public override async Task FlushAsync(CancellationToken cancellationToken) 169 { 170 try 171 { 172 _outputStream.Seek(0, SeekOrigin.Begin); 173 174 using (var contentStream = new StreamContent(_outputStream)) 175 { 176 contentStream.Headers.ContentType = ContentType ?? new MediaTypeHeaderValue(@"application/x-thrift"); 177 178 var response = (await _httpClient.PostAsync(_uri, contentStream, cancellationToken)).EnsureSuccessStatusCode(); 179 180 _inputStream?.Dispose(); 181 _inputStream = await response.Content.ReadAsStreamAsync(); 182 if (_inputStream.CanSeek) 183 { 184 _inputStream.Seek(0, SeekOrigin.Begin); 185 } 186 } 187 } 188 catch (IOException iox) 189 { 190 throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString()); 191 } 192 catch (HttpRequestException wx) 193 { 194 throw new TTransportException(TTransportException.ExceptionType.Unknown, 195 "Couldn't connect to server: " + wx); 196 } 197 catch (Exception ex) 198 { 199 throw new TTransportException(TTransportException.ExceptionType.Unknown, ex.Message); 200 } 201 finally 202 { 203 _outputStream = new MemoryStream(); 204 } 205 } 206 207 // IDisposable Dispose(bool disposing)208 protected override void Dispose(bool disposing) 209 { 210 if (!_isDisposed) 211 { 212 if (disposing) 213 { 214 _inputStream?.Dispose(); 215 _outputStream?.Dispose(); 216 _httpClient?.Dispose(); 217 } 218 } 219 _isDisposed = true; 220 } 221 } 222 } 223