1 // Licensed to the Apache Software Foundation(ASF) under one 2 // or more contributor license agreements.See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership.The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 using System; 19 using System.Collections.Generic; 20 using System.IO; 21 using System.Linq; 22 using System.Net.Http; 23 using System.Net.Http.Headers; 24 using System.Security.Cryptography.X509Certificates; 25 using System.Threading; 26 using System.Threading.Tasks; 27 28 namespace Thrift.Transports.Client 29 { 30 // ReSharper disable once InconsistentNaming 31 public class THttpClientTransport : TClientTransport 32 { 33 private readonly X509Certificate[] _certificates; 34 private readonly Uri _uri; 35 36 // Timeouts in milliseconds 37 private int _connectTimeout = 30000; 38 private HttpClient _httpClient; 39 private Stream _inputStream; 40 41 private bool _isDisposed; 42 private MemoryStream _outputStream = new MemoryStream(); 43 THttpClientTransport(Uri u, IDictionary<string, string> customHeaders)44 public THttpClientTransport(Uri u, IDictionary<string, string> customHeaders) 45 : this(u, Enumerable.Empty<X509Certificate>(), customHeaders) 46 { 47 } 48 THttpClientTransport(Uri u, IEnumerable<X509Certificate> certificates, IDictionary<string, string> customHeaders)49 public THttpClientTransport(Uri u, IEnumerable<X509Certificate> certificates, 50 IDictionary<string, string> customHeaders) 51 { 52 _uri = u; 53 _certificates = (certificates ?? Enumerable.Empty<X509Certificate>()).ToArray(); 54 CustomHeaders = customHeaders; 55 56 // due to current bug with performance of Dispose in netcore https://github.com/dotnet/corefx/issues/8809 57 // this can be switched to default way (create client->use->dispose per flush) later 58 _httpClient = CreateClient(); 59 } 60 61 public IDictionary<string, string> CustomHeaders { get; } 62 63 public int ConnectTimeout 64 { 65 set { _connectTimeout = value; } 66 } 67 68 public override bool IsOpen => true; 69 OpenAsync(CancellationToken cancellationToken)70 public override async Task OpenAsync(CancellationToken cancellationToken) 71 { 72 if (cancellationToken.IsCancellationRequested) 73 { 74 await Task.FromCanceled(cancellationToken); 75 } 76 } 77 Close()78 public override void Close() 79 { 80 if (_inputStream != null) 81 { 82 _inputStream.Dispose(); 83 _inputStream = null; 84 } 85 86 if (_outputStream != null) 87 { 88 _outputStream.Dispose(); 89 _outputStream = null; 90 } 91 92 if (_httpClient != null) 93 { 94 _httpClient.Dispose(); 95 _httpClient = null; 96 } 97 } 98 ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)99 public override async Task<int> ReadAsync(byte[] buffer, int offset, int length, 100 CancellationToken cancellationToken) 101 { 102 if (cancellationToken.IsCancellationRequested) 103 { 104 return await Task.FromCanceled<int>(cancellationToken); 105 } 106 107 if (_inputStream == null) 108 { 109 throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No request has been sent"); 110 } 111 112 try 113 { 114 var ret = await _inputStream.ReadAsync(buffer, offset, length, cancellationToken); 115 116 if (ret == -1) 117 { 118 throw new TTransportException(TTransportException.ExceptionType.EndOfFile, "No more data available"); 119 } 120 121 return ret; 122 } 123 catch (IOException iox) 124 { 125 throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString()); 126 } 127 } 128 WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)129 public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) 130 { 131 if (cancellationToken.IsCancellationRequested) 132 { 133 await Task.FromCanceled(cancellationToken); 134 } 135 136 await _outputStream.WriteAsync(buffer, offset, length, cancellationToken); 137 } 138 CreateClient()139 private HttpClient CreateClient() 140 { 141 var handler = new HttpClientHandler(); 142 handler.ClientCertificates.AddRange(_certificates); 143 144 var httpClient = new HttpClient(handler); 145 146 if (_connectTimeout > 0) 147 { 148 httpClient.Timeout = TimeSpan.FromSeconds(_connectTimeout); 149 } 150 151 httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/x-thrift")); 152 httpClient.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("THttpClientTransport", "0.12.0")); 153 154 if (CustomHeaders != null) 155 { 156 foreach (var item in CustomHeaders) 157 { 158 httpClient.DefaultRequestHeaders.Add(item.Key, item.Value); 159 } 160 } 161 162 return httpClient; 163 } 164 FlushAsync(CancellationToken cancellationToken)165 public override async Task FlushAsync(CancellationToken cancellationToken) 166 { 167 try 168 { 169 try 170 { 171 if (_outputStream.CanSeek) 172 { 173 _outputStream.Seek(0, SeekOrigin.Begin); 174 } 175 176 using (var outStream = new StreamContent(_outputStream)) 177 { 178 var msg = await _httpClient.PostAsync(_uri, outStream, cancellationToken); 179 180 msg.EnsureSuccessStatusCode(); 181 182 if (_inputStream != null) 183 { 184 _inputStream.Dispose(); 185 _inputStream = null; 186 } 187 188 _inputStream = await msg.Content.ReadAsStreamAsync(); 189 if (_inputStream.CanSeek) 190 { 191 _inputStream.Seek(0, SeekOrigin.Begin); 192 } 193 } 194 } 195 catch (IOException iox) 196 { 197 throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString()); 198 } 199 catch (HttpRequestException wx) 200 { 201 throw new TTransportException(TTransportException.ExceptionType.Unknown, 202 "Couldn't connect to server: " + wx); 203 } 204 } 205 finally 206 { 207 _outputStream = new MemoryStream(); 208 } 209 } 210 211 // IDisposable Dispose(bool disposing)212 protected override void Dispose(bool disposing) 213 { 214 if (!_isDisposed) 215 { 216 if (disposing) 217 { 218 _inputStream?.Dispose(); 219 _outputStream?.Dispose(); 220 _httpClient?.Dispose(); 221 } 222 } 223 _isDisposed = true; 224 } 225 } 226 } 227