1 use super::{
2     retries2::{RetryAction, RetryLogic},
3     service2::{TowerBatchedSink, TowerRequestSettings},
4     sink, Batch,
5 };
6 use crate::{
7     buffers::Acker,
8     dns::Resolver,
9     event::Event,
10     tls::{tls_connector_builder, MaybeTlsSettings},
11 };
12 use bytes05::{Buf, Bytes};
13 use futures::future::BoxFuture;
14 use futures01::{Async, AsyncSink, Poll as Poll01, Sink, StartSend};
15 use http::header::HeaderValue;
16 use http::{Request, StatusCode};
17 use hyper::body::{self, Body, HttpBody};
18 use hyper::client::HttpConnector;
19 use hyper::Client;
20 use hyper_openssl::HttpsConnector;
21 use serde::{Deserialize, Serialize};
22 use std::{
23     fmt,
24     future::Future,
25     sync::Arc,
26     task::{Context, Poll},
27     time::Duration,
28 };
29 use tower03::Service;
30 use tracing::Span;
31 use tracing_futures::Instrument;
32 
33 pub type HttpClientFuture = <HttpClient as Service<http::Request<Body>>>::Future;
34 
35 #[async_trait::async_trait]
36 pub trait HttpSink: Send + Sync + 'static {
37     type Input;
38     type Output;
39 
encode_event(&self, event: Event) -> Option<Self::Input>40     fn encode_event(&self, event: Event) -> Option<Self::Input>;
build_request(&self, events: Self::Output) -> crate::Result<http::Request<Vec<u8>>>41     async fn build_request(&self, events: Self::Output) -> crate::Result<http::Request<Vec<u8>>>;
42 }
43 
44 /// Provides a simple wrapper around internal tower and
45 /// batching sinks for http.
46 ///
47 /// This type wraps some `HttpSink` and some `Batch` type
48 /// and will apply request, batch and tls settings. Internally,
49 /// it holds an Arc reference to the `HttpSink`. It then exposes
50 /// a `Sink` interface that can be returned from `SinkConfig`.
51 ///
52 /// Implementation details we require to buffer a single item due
53 /// to how `Sink` works. This is because we must "encode" the type
54 /// to be able to send it to the inner batch type and sink. Because of
55 /// this we must provide a single buffer slot. To ensure the buffer is
56 /// fully flushed make sure `poll_complete` returns ready.
57 pub struct BatchedHttpSink<T, B, L = HttpRetryLogic>
58 where
59     B: Batch,
60     B::Output: Clone + Send + 'static,
61     L: RetryLogic<Response = http::Response<Bytes>> + Send + 'static,
62 {
63     sink: Arc<T>,
64     inner: TowerBatchedSink<
65         HttpBatchService<BoxFuture<'static, crate::Result<hyper::Request<Vec<u8>>>>, B::Output>,
66         B,
67         L,
68         B::Output,
69     >,
70     // An empty slot is needed to buffer an item where we encoded it but
71     // the inner sink is applying back pressure. This trick is used in the `WithFlatMap`
72     // sink combinator. https://docs.rs/futures/0.1.29/src/futures/sink/with_flat_map.rs.html#20
73     slot: Option<B::Input>,
74 }
75 
76 impl<T, B> BatchedHttpSink<T, B, HttpRetryLogic>
77 where
78     B: Batch,
79     B::Output: Clone + Send + 'static,
80     T: HttpSink<Input = B::Input, Output = B::Output>,
81 {
new( sink: T, batch: B, request_settings: TowerRequestSettings, batch_timeout: Duration, client: HttpClient, acker: Acker, ) -> Self82     pub fn new(
83         sink: T,
84         batch: B,
85         request_settings: TowerRequestSettings,
86         batch_timeout: Duration,
87         client: HttpClient,
88         acker: Acker,
89     ) -> Self {
90         Self::with_retry_logic(
91             sink,
92             batch,
93             HttpRetryLogic,
94             request_settings,
95             batch_timeout,
96             client,
97             acker,
98         )
99     }
100 }
101 
102 impl<T, B, L> BatchedHttpSink<T, B, L>
103 where
104     B: Batch,
105     B::Output: Clone + Send + 'static,
106     L: RetryLogic<Response = http::Response<Bytes>, Error = hyper::Error> + Send + 'static,
107     T: HttpSink<Input = B::Input, Output = B::Output>,
108 {
with_retry_logic( sink: T, batch: B, logic: L, request_settings: TowerRequestSettings, batch_timeout: Duration, client: HttpClient, acker: Acker, ) -> Self109     pub fn with_retry_logic(
110         sink: T,
111         batch: B,
112         logic: L,
113         request_settings: TowerRequestSettings,
114         batch_timeout: Duration,
115         client: HttpClient,
116         acker: Acker,
117     ) -> Self {
118         let sink = Arc::new(sink);
119 
120         let sink1 = Arc::clone(&sink);
121         let request_builder =
122             move |b| -> BoxFuture<'static, crate::Result<http::Request<Vec<u8>>>> {
123                 let sink = Arc::clone(&sink1);
124                 Box::pin(async move { sink.build_request(b).await })
125             };
126 
127         let svc = HttpBatchService::new(client, request_builder);
128         let inner = request_settings.batch_sink(logic, svc, batch, batch_timeout, acker);
129 
130         Self {
131             sink,
132             inner,
133             slot: None,
134         }
135     }
136 }
137 
138 impl<T, B, L> Sink for BatchedHttpSink<T, B, L>
139 where
140     B: Batch,
141     B::Output: Clone + Send + 'static,
142     T: HttpSink<Input = B::Input, Output = B::Output>,
143     L: RetryLogic<Response = http::Response<Bytes>> + Send + 'static,
144 {
145     type SinkItem = crate::Event;
146     type SinkError = crate::Error;
147 
start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError>148     fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
149         if self.slot.is_some() && self.poll_complete()?.is_not_ready() {
150             return Ok(AsyncSink::NotReady(item));
151         }
152         assert!(self.slot.is_none(), "poll_complete did not clear slot");
153 
154         if let Some(item) = self.sink.encode_event(item) {
155             self.slot = Some(item);
156             self.poll_complete()?;
157         }
158 
159         Ok(AsyncSink::Ready)
160     }
161 
poll_complete(&mut self) -> Poll01<(), Self::SinkError>162     fn poll_complete(&mut self) -> Poll01<(), Self::SinkError> {
163         if let Some(item) = self.slot.take() {
164             if let AsyncSink::NotReady(item) = self.inner.start_send(item)? {
165                 self.slot = Some(item);
166                 return Ok(Async::NotReady);
167             }
168         }
169 
170         self.inner.poll_complete()
171     }
172 }
173 
174 pub struct HttpClient<B = Body> {
175     client: Client<HttpsConnector<HttpConnector<Resolver>>, B>,
176     span: Span,
177     user_agent: HeaderValue,
178 }
179 
180 impl<B> HttpClient<B>
181 where
182     B: HttpBody + Send + 'static,
183     B::Data: Send,
184     B::Error: Into<crate::Error>,
185 {
new( resolver: Resolver, tls_settings: impl Into<MaybeTlsSettings>, ) -> crate::Result<HttpClient<B>>186     pub fn new(
187         resolver: Resolver,
188         tls_settings: impl Into<MaybeTlsSettings>,
189     ) -> crate::Result<HttpClient<B>> {
190         let mut http = HttpConnector::new_with_resolver(resolver);
191         http.enforce_http(false);
192 
193         let settings = tls_settings.into();
194         let tls = tls_connector_builder(&settings)?;
195         let mut https = HttpsConnector::with_connector(http, tls)?;
196 
197         let settings = settings.tls().cloned();
198         https.set_callback(move |c, _uri| {
199             if let Some(settings) = &settings {
200                 settings.apply_connect_configuration(c);
201             }
202 
203             Ok(())
204         });
205 
206         let client = Client::builder().build(https);
207 
208         let version = crate::get_version();
209         let user_agent = HeaderValue::from_str(&format!("Vector/{}", version))
210             .expect("Invalid header value for version!");
211 
212         let span = tracing::info_span!("http");
213 
214         Ok(HttpClient {
215             client,
216             span,
217             user_agent,
218         })
219     }
220 
send(&mut self, request: Request<B>) -> crate::Result<http::Response<Body>>221     pub async fn send(&mut self, request: Request<B>) -> crate::Result<http::Response<Body>> {
222         self.call(request).await.map_err(Into::into)
223     }
224 }
225 
226 impl<B> Service<Request<B>> for HttpClient<B>
227 where
228     B: HttpBody + Send + 'static,
229     B::Data: Send,
230     B::Error: Into<crate::Error>,
231 {
232     type Response = http::Response<Body>;
233     type Error = hyper::Error;
234     type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
235 
poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>236     fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
237         Poll::Ready(Ok(()))
238     }
239 
call(&mut self, mut request: Request<B>) -> Self::Future240     fn call(&mut self, mut request: Request<B>) -> Self::Future {
241         let _enter = self.span.enter();
242 
243         if !request.headers().contains_key("User-Agent") {
244             request
245                 .headers_mut()
246                 .insert("User-Agent", self.user_agent.clone());
247         }
248 
249         debug!(message = "sending request.", uri = %request.uri(), method = %request.method());
250 
251         let response = self.client.request(request);
252 
253         let fut = async move {
254             let res = response.await?;
255             debug!(
256                     message = "response.",
257                     status = ?res.status(),
258                     version = ?res.version(),
259             );
260             Ok(res)
261         }
262         .instrument(self.span.clone());
263 
264         Box::pin(fut)
265     }
266 }
267 
268 impl<B> Clone for HttpClient<B> {
clone(&self) -> Self269     fn clone(&self) -> Self {
270         Self {
271             client: self.client.clone(),
272             span: self.span.clone(),
273             user_agent: self.user_agent.clone(),
274         }
275     }
276 }
277 
278 impl<B> fmt::Debug for HttpClient<B> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result279     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
280         f.debug_struct("HttpClient")
281             .field("client", &self.client)
282             .field("user_agent", &self.user_agent)
283             .finish()
284     }
285 }
286 
287 pub struct HttpBatchService<F, B = Vec<u8>> {
288     inner: HttpClient<Body>,
289     request_builder: Arc<dyn Fn(B) -> F + Send + Sync>,
290 }
291 
292 impl<F, B> HttpBatchService<F, B> {
new( inner: HttpClient, request_builder: impl Fn(B) -> F + Send + Sync + 'static, ) -> Self293     pub fn new(
294         inner: HttpClient,
295         request_builder: impl Fn(B) -> F + Send + Sync + 'static,
296     ) -> Self {
297         HttpBatchService {
298             inner,
299             request_builder: Arc::new(Box::new(request_builder)),
300         }
301     }
302 }
303 
304 impl<F, B> Service<B> for HttpBatchService<F, B>
305 where
306     F: Future<Output = crate::Result<hyper::Request<Vec<u8>>>> + Send + 'static,
307     B: Send + 'static,
308 {
309     type Response = http::Response<Bytes>;
310     type Error = crate::Error;
311     type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
312 
poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>313     fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
314         Poll::Ready(Ok(()))
315     }
316 
call(&mut self, body: B) -> Self::Future317     fn call(&mut self, body: B) -> Self::Future {
318         let request_builder = Arc::clone(&self.request_builder);
319         let mut http_client = self.inner.clone();
320 
321         Box::pin(async move {
322             let request = request_builder(body).await?.map(Body::from);
323             let response = http_client.call(request).await?;
324             let (parts, body) = response.into_parts();
325             let mut body = body::aggregate(body).await?;
326             Ok(hyper::Response::from_parts(parts, body.to_bytes()))
327         })
328     }
329 }
330 
331 impl<F, B> Clone for HttpBatchService<F, B> {
clone(&self) -> Self332     fn clone(&self) -> Self {
333         Self {
334             inner: self.inner.clone(),
335             request_builder: self.request_builder.clone(),
336         }
337     }
338 }
339 
340 impl<T: fmt::Debug> sink::Response for http::Response<T> {
is_successful(&self) -> bool341     fn is_successful(&self) -> bool {
342         self.status().is_success()
343     }
344 }
345 
346 #[derive(Clone)]
347 pub struct HttpRetryLogic;
348 
349 impl RetryLogic for HttpRetryLogic {
350     type Error = hyper::Error;
351     type Response = hyper::Response<Bytes>;
352 
is_retriable_error(&self, error: &Self::Error) -> bool353     fn is_retriable_error(&self, error: &Self::Error) -> bool {
354         error.is_connect() || error.is_closed()
355     }
356 
should_retry_response(&self, response: &Self::Response) -> RetryAction357     fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
358         let status = response.status();
359 
360         match status {
361             StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("Too many requests".into()),
362             StatusCode::NOT_IMPLEMENTED => {
363                 RetryAction::DontRetry("endpoint not implemented".into())
364             }
365             _ if status.is_server_error() => RetryAction::Retry(format!(
366                 "{}: {}",
367                 status,
368                 String::from_utf8_lossy(response.body())
369             )),
370             _ if status.is_success() => RetryAction::Successful,
371             _ => RetryAction::DontRetry(format!("response status: {}", status)),
372         }
373     }
374 }
375 
376 #[derive(Deserialize, Serialize, Clone, Debug)]
377 #[serde(deny_unknown_fields, rename_all = "snake_case", tag = "strategy")]
378 pub enum Auth {
379     Basic { user: String, password: String },
380     Bearer { token: String },
381 }
382 
383 impl Auth {
apply<B>(&self, req: &mut Request<B>)384     pub fn apply<B>(&self, req: &mut Request<B>) {
385         use headers::{Authorization, HeaderMapExt};
386 
387         match &self {
388             Auth::Basic { user, password } => {
389                 let auth = Authorization::basic(&user, &password);
390                 req.headers_mut().typed_insert(auth);
391             }
392             Auth::Bearer { token } => match Authorization::bearer(&token) {
393                 Ok(auth) => req.headers_mut().typed_insert(auth),
394                 Err(error) => error!(message = "invalid bearer token", %token, %error),
395             },
396         }
397     }
398 }
399 
400 #[cfg(test)]
401 mod test {
402     use super::*;
403     use crate::test_util::runtime;
404     use bytes05::Buf;
405     use futures::future::ready;
406     use futures01::{Future, Stream};
407     use hyper::service::{make_service_fn, service_fn};
408     use hyper::{Body, Response, Server, Uri};
409     use tower03::Service;
410 
411     #[test]
util_http_retry_logic()412     fn util_http_retry_logic() {
413         let logic = HttpRetryLogic;
414 
415         let response_429 = Response::builder().status(429).body(Bytes::new()).unwrap();
416         let response_500 = Response::builder().status(500).body(Bytes::new()).unwrap();
417         let response_400 = Response::builder().status(400).body(Bytes::new()).unwrap();
418         let response_501 = Response::builder().status(501).body(Bytes::new()).unwrap();
419 
420         assert!(logic.should_retry_response(&response_429).is_retryable());
421         assert!(logic.should_retry_response(&response_500).is_retryable());
422         assert!(logic
423             .should_retry_response(&response_400)
424             .is_not_retryable());
425         assert!(logic
426             .should_retry_response(&response_501)
427             .is_not_retryable());
428     }
429 
430     #[test]
util_http_it_makes_http_requests()431     fn util_http_it_makes_http_requests() {
432         let addr = crate::test_util::next_addr();
433         let resolver = Resolver;
434 
435         let uri = format!("http://{}:{}/", addr.ip(), addr.port())
436             .parse::<Uri>()
437             .unwrap();
438 
439         let request = b"hello".to_vec();
440         let client = HttpClient::new(resolver, None).unwrap();
441         let mut service = HttpBatchService::new(client, move |body: Vec<u8>| {
442             Box::pin(ready(Request::post(&uri).body(body).map_err(Into::into)))
443         });
444 
445         let (tx, rx) = futures01::sync::mpsc::channel(10);
446 
447         let new_service = make_service_fn(move |_| {
448             let tx = tx.clone();
449 
450             let svc = service_fn(move |req| {
451                 let mut tx = tx.clone();
452 
453                 async move {
454                     let body = hyper::body::aggregate(req.into_body())
455                         .await
456                         .map_err(|e| format!("error: {}", e))?;
457                     let string = String::from_utf8(body.bytes().into())
458                         .map_err(|_| "Wasn't UTF-8".to_string())?;
459                     tx.try_send(string).map_err(|_| "Send error".to_string())?;
460 
461                     Ok::<_, crate::Error>(Response::new(Body::from("")))
462                 }
463             });
464 
465             async move { Ok::<_, std::convert::Infallible>(svc) }
466         });
467 
468         let mut rt = runtime();
469 
470         rt.spawn_std(async move {
471             if let Err(e) = Server::bind(&addr).serve(new_service).await {
472                 eprintln!("server error: {}", e);
473             }
474         });
475 
476         rt.block_on_std(async move {
477             tokio::time::delay_for(std::time::Duration::from_millis(50)).await;
478             service.call(request).await
479         })
480         .unwrap();
481 
482         let _ = rt.shutdown_now();
483 
484         let (body, _rest) = rx.into_future().wait().unwrap();
485         assert_eq!(body.unwrap(), "hello");
486     }
487 }
488