1 // Licensed to the Apache Software Foundation(ASF) under one
2 // or more contributor license agreements.See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 using System;
19 using System.Collections.Generic;
20 using System.IO;
21 using System.Linq;
22 using System.Net.Http;
23 using System.Net.Http.Headers;
24 using System.Security.Cryptography.X509Certificates;
25 using System.Threading;
26 using System.Threading.Tasks;
27 
28 namespace Thrift.Transport.Client
29 {
30     // ReSharper disable once InconsistentNaming
31     public class THttpTransport : TTransport
32     {
33         private readonly X509Certificate[] _certificates;
34         private readonly Uri _uri;
35 
36         private int _connectTimeout = 30000; // Timeouts in milliseconds
37         private HttpClient _httpClient;
38         private Stream _inputStream;
39         private MemoryStream _outputStream = new MemoryStream();
40         private bool _isDisposed;
41 
THttpTransport(Uri uri, IDictionary<string, string> customRequestHeaders = null, string userAgent = null)42         public THttpTransport(Uri uri, IDictionary<string, string> customRequestHeaders = null, string userAgent = null)
43             : this(uri, Enumerable.Empty<X509Certificate>(), customRequestHeaders, userAgent)
44         {
45         }
46 
THttpTransport(Uri uri, IEnumerable<X509Certificate> certificates, IDictionary<string, string> customRequestHeaders, string userAgent = null)47         public THttpTransport(Uri uri, IEnumerable<X509Certificate> certificates,
48             IDictionary<string, string> customRequestHeaders, string userAgent = null)
49         {
50             _uri = uri;
51             _certificates = (certificates ?? Enumerable.Empty<X509Certificate>()).ToArray();
52 
53             if (!string.IsNullOrEmpty(userAgent))
54                 UserAgent = userAgent;
55 
56             // due to current bug with performance of Dispose in netcore https://github.com/dotnet/corefx/issues/8809
57             // this can be switched to default way (create client->use->dispose per flush) later
58             _httpClient = CreateClient(customRequestHeaders);
59         }
60 
61         // According to RFC 2616 section 3.8, the "User-Agent" header may not carry a version number
62         public readonly string UserAgent = "Thrift netstd THttpClient";
63 
64         public override bool IsOpen => true;
65 
66         public HttpRequestHeaders RequestHeaders => _httpClient.DefaultRequestHeaders;
67 
68         public MediaTypeHeaderValue ContentType { get; set; }
69 
OpenAsync(CancellationToken cancellationToken)70         public override async Task OpenAsync(CancellationToken cancellationToken)
71         {
72             if (cancellationToken.IsCancellationRequested)
73             {
74                 await Task.FromCanceled(cancellationToken);
75             }
76         }
77 
Close()78         public override void Close()
79         {
80             if (_inputStream != null)
81             {
82                 _inputStream.Dispose();
83                 _inputStream = null;
84             }
85 
86             if (_outputStream != null)
87             {
88                 _outputStream.Dispose();
89                 _outputStream = null;
90             }
91 
92             if (_httpClient != null)
93             {
94                 _httpClient.Dispose();
95                 _httpClient = null;
96             }
97         }
98 
ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)99         public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
100         {
101             if (cancellationToken.IsCancellationRequested)
102             {
103                 return await Task.FromCanceled<int>(cancellationToken);
104             }
105 
106             if (_inputStream == null)
107             {
108                 throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No request has been sent");
109             }
110 
111             try
112             {
113                 var ret = await _inputStream.ReadAsync(buffer, offset, length, cancellationToken);
114 
115                 if (ret == -1)
116                 {
117                     throw new TTransportException(TTransportException.ExceptionType.EndOfFile, "No more data available");
118                 }
119 
120                 return ret;
121             }
122             catch (IOException iox)
123             {
124                 throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString());
125             }
126         }
127 
WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)128         public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
129         {
130             if (cancellationToken.IsCancellationRequested)
131             {
132                 await Task.FromCanceled(cancellationToken);
133             }
134 
135             await _outputStream.WriteAsync(buffer, offset, length, cancellationToken);
136         }
137 
CreateClient(IDictionary<string, string> customRequestHeaders)138         private HttpClient CreateClient(IDictionary<string, string> customRequestHeaders)
139         {
140             var handler = new HttpClientHandler();
141             handler.ClientCertificates.AddRange(_certificates);
142             handler.AutomaticDecompression = System.Net.DecompressionMethods.Deflate | System.Net.DecompressionMethods.GZip;
143 
144             var httpClient = new HttpClient(handler);
145 
146             if (_connectTimeout > 0)
147             {
148                 httpClient.Timeout = TimeSpan.FromMilliseconds(_connectTimeout);
149             }
150 
151             httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/x-thrift"));
152             httpClient.DefaultRequestHeaders.UserAgent.TryParseAdd(UserAgent);
153 
154             httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("deflate"));
155             httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("gzip"));
156 
157             if (customRequestHeaders != null)
158             {
159                 foreach (var item in customRequestHeaders)
160                 {
161                     httpClient.DefaultRequestHeaders.Add(item.Key, item.Value);
162                 }
163             }
164 
165             return httpClient;
166         }
167 
FlushAsync(CancellationToken cancellationToken)168         public override async Task FlushAsync(CancellationToken cancellationToken)
169         {
170             try
171             {
172                 _outputStream.Seek(0, SeekOrigin.Begin);
173 
174                 using (var contentStream = new StreamContent(_outputStream))
175                 {
176                     contentStream.Headers.ContentType = ContentType ?? new MediaTypeHeaderValue(@"application/x-thrift");
177 
178                     var response = (await _httpClient.PostAsync(_uri, contentStream, cancellationToken)).EnsureSuccessStatusCode();
179 
180                     _inputStream?.Dispose();
181                     _inputStream = await response.Content.ReadAsStreamAsync();
182                     if (_inputStream.CanSeek)
183                     {
184                         _inputStream.Seek(0, SeekOrigin.Begin);
185                     }
186                 }
187             }
188             catch (IOException iox)
189             {
190                 throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString());
191             }
192             catch (HttpRequestException wx)
193             {
194                 throw new TTransportException(TTransportException.ExceptionType.Unknown,
195                     "Couldn't connect to server: " + wx);
196             }
197             catch (Exception ex)
198             {
199                 throw new TTransportException(TTransportException.ExceptionType.Unknown, ex.Message);
200             }
201             finally
202             {
203                 _outputStream = new MemoryStream();
204             }
205         }
206 
207         // IDisposable
Dispose(bool disposing)208         protected override void Dispose(bool disposing)
209         {
210             if (!_isDisposed)
211             {
212                 if (disposing)
213                 {
214                     _inputStream?.Dispose();
215                     _outputStream?.Dispose();
216                     _httpClient?.Dispose();
217                 }
218             }
219             _isDisposed = true;
220         }
221     }
222 }
223