1 // Licensed to the Apache Software Foundation(ASF) under one
2 // or more contributor license agreements.See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 using System;
19 using System.Collections.Generic;
20 using System.IO;
21 using System.Linq;
22 using System.Net.Http;
23 using System.Net.Http.Headers;
24 using System.Security.Cryptography.X509Certificates;
25 using System.Threading;
26 using System.Threading.Tasks;
27 
28 namespace Thrift.Transports.Client
29 {
30     // ReSharper disable once InconsistentNaming
31     public class THttpClientTransport : TClientTransport
32     {
33         private readonly X509Certificate[] _certificates;
34         private readonly Uri _uri;
35 
36         // Timeouts in milliseconds
37         private int _connectTimeout = 30000;
38         private HttpClient _httpClient;
39         private Stream _inputStream;
40 
41         private bool _isDisposed;
42         private MemoryStream _outputStream = new MemoryStream();
43 
THttpClientTransport(Uri u, IDictionary<string, string> customHeaders)44         public THttpClientTransport(Uri u, IDictionary<string, string> customHeaders)
45             : this(u, Enumerable.Empty<X509Certificate>(), customHeaders)
46         {
47         }
48 
THttpClientTransport(Uri u, IEnumerable<X509Certificate> certificates, IDictionary<string, string> customHeaders)49         public THttpClientTransport(Uri u, IEnumerable<X509Certificate> certificates,
50             IDictionary<string, string> customHeaders)
51         {
52             _uri = u;
53             _certificates = (certificates ?? Enumerable.Empty<X509Certificate>()).ToArray();
54             CustomHeaders = customHeaders;
55 
56             // due to current bug with performance of Dispose in netcore https://github.com/dotnet/corefx/issues/8809
57             // this can be switched to default way (create client->use->dispose per flush) later
58             _httpClient = CreateClient();
59         }
60 
61         public IDictionary<string, string> CustomHeaders { get; }
62 
63         public int ConnectTimeout
64         {
65             set { _connectTimeout = value; }
66         }
67 
68         public override bool IsOpen => true;
69 
OpenAsync(CancellationToken cancellationToken)70         public override async Task OpenAsync(CancellationToken cancellationToken)
71         {
72             if (cancellationToken.IsCancellationRequested)
73             {
74                 await Task.FromCanceled(cancellationToken);
75             }
76         }
77 
Close()78         public override void Close()
79         {
80             if (_inputStream != null)
81             {
82                 _inputStream.Dispose();
83                 _inputStream = null;
84             }
85 
86             if (_outputStream != null)
87             {
88                 _outputStream.Dispose();
89                 _outputStream = null;
90             }
91 
92             if (_httpClient != null)
93             {
94                 _httpClient.Dispose();
95                 _httpClient = null;
96             }
97         }
98 
ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)99         public override async Task<int> ReadAsync(byte[] buffer, int offset, int length,
100             CancellationToken cancellationToken)
101         {
102             if (cancellationToken.IsCancellationRequested)
103             {
104                 return await Task.FromCanceled<int>(cancellationToken);
105             }
106 
107             if (_inputStream == null)
108             {
109                 throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No request has been sent");
110             }
111 
112             try
113             {
114                 var ret = await _inputStream.ReadAsync(buffer, offset, length, cancellationToken);
115 
116                 if (ret == -1)
117                 {
118                     throw new TTransportException(TTransportException.ExceptionType.EndOfFile, "No more data available");
119                 }
120 
121                 return ret;
122             }
123             catch (IOException iox)
124             {
125                 throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString());
126             }
127         }
128 
WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)129         public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
130         {
131             if (cancellationToken.IsCancellationRequested)
132             {
133                 await Task.FromCanceled(cancellationToken);
134             }
135 
136             await _outputStream.WriteAsync(buffer, offset, length, cancellationToken);
137         }
138 
CreateClient()139         private HttpClient CreateClient()
140         {
141             var handler = new HttpClientHandler();
142             handler.ClientCertificates.AddRange(_certificates);
143 
144             var httpClient = new HttpClient(handler);
145 
146             if (_connectTimeout > 0)
147             {
148                 httpClient.Timeout = TimeSpan.FromSeconds(_connectTimeout);
149             }
150 
151             httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/x-thrift"));
152             httpClient.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("THttpClientTransport", "0.12.0"));
153 
154             if (CustomHeaders != null)
155             {
156                 foreach (var item in CustomHeaders)
157                 {
158                     httpClient.DefaultRequestHeaders.Add(item.Key, item.Value);
159                 }
160             }
161 
162             return httpClient;
163         }
164 
FlushAsync(CancellationToken cancellationToken)165         public override async Task FlushAsync(CancellationToken cancellationToken)
166         {
167             try
168             {
169                 try
170                 {
171                     if (_outputStream.CanSeek)
172                     {
173                         _outputStream.Seek(0, SeekOrigin.Begin);
174                     }
175 
176                     using (var outStream = new StreamContent(_outputStream))
177                     {
178                         var msg = await _httpClient.PostAsync(_uri, outStream, cancellationToken);
179 
180                         msg.EnsureSuccessStatusCode();
181 
182                         if (_inputStream != null)
183                         {
184                             _inputStream.Dispose();
185                             _inputStream = null;
186                         }
187 
188                         _inputStream = await msg.Content.ReadAsStreamAsync();
189                         if (_inputStream.CanSeek)
190                         {
191                             _inputStream.Seek(0, SeekOrigin.Begin);
192                         }
193                     }
194                 }
195                 catch (IOException iox)
196                 {
197                     throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString());
198                 }
199                 catch (HttpRequestException wx)
200                 {
201                     throw new TTransportException(TTransportException.ExceptionType.Unknown,
202                         "Couldn't connect to server: " + wx);
203                 }
204             }
205             finally
206             {
207                 _outputStream = new MemoryStream();
208             }
209         }
210 
211         // IDisposable
Dispose(bool disposing)212         protected override void Dispose(bool disposing)
213         {
214             if (!_isDisposed)
215             {
216                 if (disposing)
217                 {
218                     _inputStream?.Dispose();
219                     _outputStream?.Dispose();
220                     _httpClient?.Dispose();
221                 }
222             }
223             _isDisposed = true;
224         }
225     }
226 }
227