1 // Licensed to the Apache Software Foundation(ASF) under one
2 // or more contributor license agreements.See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 using System;
19 using System.Collections.Generic;
20 using System.IO;
21 using System.Linq;
22 using System.Net.Http;
23 using System.Net.Http.Headers;
24 using System.Security.Cryptography.X509Certificates;
25 using System.Threading;
26 using System.Threading.Tasks;
27 
28 namespace Thrift.Transport.Client
29 {
30     // ReSharper disable once InconsistentNaming
31     public class THttpTransport : TEndpointTransport
32     {
33         private readonly X509Certificate[] _certificates;
34         private readonly Uri _uri;
35 
36         private int _connectTimeout = 30000; // Timeouts in milliseconds
37         private HttpClient _httpClient;
38         private Stream _inputStream;
39         private MemoryStream _outputStream = new MemoryStream();
40         private bool _isDisposed;
41 
THttpTransport(Uri uri, TConfiguration config, IDictionary<string, string> customRequestHeaders = null, string userAgent = null)42         public THttpTransport(Uri uri, TConfiguration config, IDictionary<string, string> customRequestHeaders = null, string userAgent = null)
43             : this(uri, config, Enumerable.Empty<X509Certificate>(), customRequestHeaders, userAgent)
44         {
45         }
46 
THttpTransport(Uri uri, TConfiguration config, IEnumerable<X509Certificate> certificates, IDictionary<string, string> customRequestHeaders, string userAgent = null)47         public THttpTransport(Uri uri, TConfiguration config, IEnumerable<X509Certificate> certificates,
48             IDictionary<string, string> customRequestHeaders, string userAgent = null)
49             : base(config)
50         {
51             _uri = uri;
52             _certificates = (certificates ?? Enumerable.Empty<X509Certificate>()).ToArray();
53 
54             if (!string.IsNullOrEmpty(userAgent))
55                 UserAgent = userAgent;
56 
57             // due to current bug with performance of Dispose in netcore https://github.com/dotnet/corefx/issues/8809
58             // this can be switched to default way (create client->use->dispose per flush) later
59             _httpClient = CreateClient(customRequestHeaders);
60             ConfigureClient(_httpClient);
61         }
62 
63         /// <summary>
64         /// Constructor that takes a <c>HttpClient</c> instance to support using <c>IHttpClientFactory</c>.
65         /// </summary>
66         /// <remarks>As the <c>HttpMessageHandler</c> of the client must be configured at the time of creation, it
67         /// is assumed that the consumer has already added any certificates and configured decompression methods. The
68         /// consumer can use the <c>CreateHttpClientHandler</c> method to get a handler with these set.</remarks>
69         /// <param name="httpClient">Client configured with the desired message handler, user agent, and URI if not
70         /// specified in the <c>uri</c> parameter. A default user agent will be used if not set.</param>
71         /// <param name="config">Thrift configuration object</param>
72         /// <param name="uri">Optional URI to use for requests, if not specified the base address of <c>httpClient</c>
73         /// is used.</param>
THttpTransport(HttpClient httpClient, TConfiguration config, Uri uri = null)74         public THttpTransport(HttpClient httpClient, TConfiguration config, Uri uri = null)
75             : base(config)
76         {
77             _httpClient = httpClient;
78 
79             _uri = uri ?? httpClient.BaseAddress;
80             httpClient.BaseAddress = _uri;
81 
82             var userAgent = _httpClient.DefaultRequestHeaders.UserAgent.ToString();
83             if (!string.IsNullOrEmpty(userAgent))
84                 UserAgent = userAgent;
85 
86             ConfigureClient(_httpClient);
87         }
88 
89         // According to RFC 2616 section 3.8, the "User-Agent" header may not carry a version number
90         public readonly string UserAgent = "Thrift netstd THttpClient";
91 
92         public override bool IsOpen => true;
93 
94         public HttpRequestHeaders RequestHeaders => _httpClient.DefaultRequestHeaders;
95 
96         public MediaTypeHeaderValue ContentType { get; set; }
97 
OpenAsync(CancellationToken cancellationToken)98         public override Task OpenAsync(CancellationToken cancellationToken)
99         {
100             cancellationToken.ThrowIfCancellationRequested();
101             return Task.CompletedTask;
102         }
103 
Close()104         public override void Close()
105         {
106             if (_inputStream != null)
107             {
108                 _inputStream.Dispose();
109                 _inputStream = null;
110             }
111 
112             if (_outputStream != null)
113             {
114                 _outputStream.Dispose();
115                 _outputStream = null;
116             }
117 
118             if (_httpClient != null)
119             {
120                 _httpClient.Dispose();
121                 _httpClient = null;
122             }
123         }
124 
ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)125         public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
126         {
127             cancellationToken.ThrowIfCancellationRequested();
128 
129             if (_inputStream == null)
130                 throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No request has been sent");
131 
132             CheckReadBytesAvailable(length);
133 
134             try
135             {
136 #if NETSTANDARD2_1
137                 var ret = await _inputStream.ReadAsync(new Memory<byte>(buffer, offset, length), cancellationToken);
138 #else
139                 var ret = await _inputStream.ReadAsync(buffer, offset, length, cancellationToken);
140 #endif
141                 if (ret == -1)
142                 {
143                     throw new TTransportException(TTransportException.ExceptionType.EndOfFile, "No more data available");
144                 }
145 
146                 CountConsumedMessageBytes(ret);
147                 return ret;
148             }
149             catch (IOException iox)
150             {
151                 throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString());
152             }
153         }
154 
WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)155         public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
156         {
157             cancellationToken.ThrowIfCancellationRequested();
158 
159             await _outputStream.WriteAsync(buffer, offset, length, cancellationToken);
160         }
161 
162         /// <summary>
163         /// Get a client handler configured with recommended properties to use with the <c>HttpClient</c> constructor
164         /// and an <c>IHttpClientFactory</c>.
165         /// </summary>
166         /// <param name="certificates">An optional array of client certificates to associate with the handler.</param>
167         /// <returns>
168         /// A client handler with deflate and gZip compression-decompression algorithms and any client
169         /// certificates passed in via <c>certificates</c>.
170         /// </returns>
CreateHttpClientHandler(X509Certificate[] certificates = null)171         public virtual HttpClientHandler CreateHttpClientHandler(X509Certificate[] certificates = null)
172         {
173             var handler = new HttpClientHandler();
174             if (certificates != null)
175                 handler.ClientCertificates.AddRange(certificates);
176             handler.AutomaticDecompression = System.Net.DecompressionMethods.Deflate | System.Net.DecompressionMethods.GZip;
177             return handler;
178         }
179 
CreateClient(IDictionary<string, string> customRequestHeaders)180         private HttpClient CreateClient(IDictionary<string, string> customRequestHeaders)
181         {
182             var handler = CreateHttpClientHandler(_certificates);
183             var httpClient = new HttpClient(handler);
184 
185 
186             if (customRequestHeaders != null)
187             {
188                 foreach (var item in customRequestHeaders)
189                 {
190                     httpClient.DefaultRequestHeaders.Add(item.Key, item.Value);
191                 }
192             }
193 
194             return httpClient;
195         }
196 
ConfigureClient(HttpClient httpClient)197         private void ConfigureClient(HttpClient httpClient)
198         {
199             if (_connectTimeout > 0)
200             {
201                 httpClient.Timeout = TimeSpan.FromMilliseconds(_connectTimeout);
202             }
203 
204             httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/x-thrift"));
205 
206             // Clear any user agent values to avoid drift with the field value
207             httpClient.DefaultRequestHeaders.UserAgent.Clear();
208             httpClient.DefaultRequestHeaders.UserAgent.TryParseAdd(UserAgent);
209 
210             httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("deflate"));
211             httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("gzip"));
212         }
213 
FlushAsync(CancellationToken cancellationToken)214         public override async Task FlushAsync(CancellationToken cancellationToken)
215         {
216             try
217             {
218                 _outputStream.Seek(0, SeekOrigin.Begin);
219 
220                 using (var contentStream = new StreamContent(_outputStream))
221                 {
222                     contentStream.Headers.ContentType = ContentType ?? new MediaTypeHeaderValue(@"application/x-thrift");
223 
224                     var response = (await _httpClient.PostAsync(_uri, contentStream, cancellationToken)).EnsureSuccessStatusCode();
225 
226                     _inputStream?.Dispose();
227                     _inputStream = await response.Content.ReadAsStreamAsync();
228                     if (_inputStream.CanSeek)
229                     {
230                         _inputStream.Seek(0, SeekOrigin.Begin);
231                     }
232                 }
233             }
234             catch (IOException iox)
235             {
236                 throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString());
237             }
238             catch (HttpRequestException wx)
239             {
240                 throw new TTransportException(TTransportException.ExceptionType.Unknown,
241                     "Couldn't connect to server: " + wx);
242             }
243             catch (Exception ex)
244             {
245                 throw new TTransportException(TTransportException.ExceptionType.Unknown, ex.Message);
246             }
247             finally
248             {
249                 _outputStream = new MemoryStream();
250                 ResetConsumedMessageSize();
251             }
252         }
253 
254 
255         // IDisposable
Dispose(bool disposing)256         protected override void Dispose(bool disposing)
257         {
258             if (!_isDisposed)
259             {
260                 if (disposing)
261                 {
262                     _inputStream?.Dispose();
263                     _outputStream?.Dispose();
264                     _httpClient?.Dispose();
265                 }
266             }
267             _isDisposed = true;
268         }
269     }
270 }
271