1 use super::{ 2 retries2::{RetryAction, RetryLogic}, 3 service2::{TowerBatchedSink, TowerRequestSettings}, 4 sink, Batch, 5 }; 6 use crate::{ 7 buffers::Acker, 8 dns::Resolver, 9 event::Event, 10 tls::{tls_connector_builder, MaybeTlsSettings}, 11 }; 12 use bytes05::{Buf, Bytes}; 13 use futures::future::BoxFuture; 14 use futures01::{Async, AsyncSink, Poll as Poll01, Sink, StartSend}; 15 use http::header::HeaderValue; 16 use http::{Request, StatusCode}; 17 use hyper::body::{self, Body, HttpBody}; 18 use hyper::client::HttpConnector; 19 use hyper::Client; 20 use hyper_openssl::HttpsConnector; 21 use serde::{Deserialize, Serialize}; 22 use std::{ 23 fmt, 24 future::Future, 25 sync::Arc, 26 task::{Context, Poll}, 27 time::Duration, 28 }; 29 use tower03::Service; 30 use tracing::Span; 31 use tracing_futures::Instrument; 32 33 pub type HttpClientFuture = <HttpClient as Service<http::Request<Body>>>::Future; 34 35 #[async_trait::async_trait] 36 pub trait HttpSink: Send + Sync + 'static { 37 type Input; 38 type Output; 39 encode_event(&self, event: Event) -> Option<Self::Input>40 fn encode_event(&self, event: Event) -> Option<Self::Input>; build_request(&self, events: Self::Output) -> crate::Result<http::Request<Vec<u8>>>41 async fn build_request(&self, events: Self::Output) -> crate::Result<http::Request<Vec<u8>>>; 42 } 43 44 /// Provides a simple wrapper around internal tower and 45 /// batching sinks for http. 46 /// 47 /// This type wraps some `HttpSink` and some `Batch` type 48 /// and will apply request, batch and tls settings. Internally, 49 /// it holds an Arc reference to the `HttpSink`. It then exposes 50 /// a `Sink` interface that can be returned from `SinkConfig`. 51 /// 52 /// Implementation details we require to buffer a single item due 53 /// to how `Sink` works. This is because we must "encode" the type 54 /// to be able to send it to the inner batch type and sink. Because of 55 /// this we must provide a single buffer slot. To ensure the buffer is 56 /// fully flushed make sure `poll_complete` returns ready. 57 pub struct BatchedHttpSink<T, B, L = HttpRetryLogic> 58 where 59 B: Batch, 60 B::Output: Clone + Send + 'static, 61 L: RetryLogic<Response = http::Response<Bytes>> + Send + 'static, 62 { 63 sink: Arc<T>, 64 inner: TowerBatchedSink< 65 HttpBatchService<BoxFuture<'static, crate::Result<hyper::Request<Vec<u8>>>>, B::Output>, 66 B, 67 L, 68 B::Output, 69 >, 70 // An empty slot is needed to buffer an item where we encoded it but 71 // the inner sink is applying back pressure. This trick is used in the `WithFlatMap` 72 // sink combinator. https://docs.rs/futures/0.1.29/src/futures/sink/with_flat_map.rs.html#20 73 slot: Option<B::Input>, 74 } 75 76 impl<T, B> BatchedHttpSink<T, B, HttpRetryLogic> 77 where 78 B: Batch, 79 B::Output: Clone + Send + 'static, 80 T: HttpSink<Input = B::Input, Output = B::Output>, 81 { new( sink: T, batch: B, request_settings: TowerRequestSettings, batch_timeout: Duration, client: HttpClient, acker: Acker, ) -> Self82 pub fn new( 83 sink: T, 84 batch: B, 85 request_settings: TowerRequestSettings, 86 batch_timeout: Duration, 87 client: HttpClient, 88 acker: Acker, 89 ) -> Self { 90 Self::with_retry_logic( 91 sink, 92 batch, 93 HttpRetryLogic, 94 request_settings, 95 batch_timeout, 96 client, 97 acker, 98 ) 99 } 100 } 101 102 impl<T, B, L> BatchedHttpSink<T, B, L> 103 where 104 B: Batch, 105 B::Output: Clone + Send + 'static, 106 L: RetryLogic<Response = http::Response<Bytes>, Error = hyper::Error> + Send + 'static, 107 T: HttpSink<Input = B::Input, Output = B::Output>, 108 { with_retry_logic( sink: T, batch: B, logic: L, request_settings: TowerRequestSettings, batch_timeout: Duration, client: HttpClient, acker: Acker, ) -> Self109 pub fn with_retry_logic( 110 sink: T, 111 batch: B, 112 logic: L, 113 request_settings: TowerRequestSettings, 114 batch_timeout: Duration, 115 client: HttpClient, 116 acker: Acker, 117 ) -> Self { 118 let sink = Arc::new(sink); 119 120 let sink1 = Arc::clone(&sink); 121 let request_builder = 122 move |b| -> BoxFuture<'static, crate::Result<http::Request<Vec<u8>>>> { 123 let sink = Arc::clone(&sink1); 124 Box::pin(async move { sink.build_request(b).await }) 125 }; 126 127 let svc = HttpBatchService::new(client, request_builder); 128 let inner = request_settings.batch_sink(logic, svc, batch, batch_timeout, acker); 129 130 Self { 131 sink, 132 inner, 133 slot: None, 134 } 135 } 136 } 137 138 impl<T, B, L> Sink for BatchedHttpSink<T, B, L> 139 where 140 B: Batch, 141 B::Output: Clone + Send + 'static, 142 T: HttpSink<Input = B::Input, Output = B::Output>, 143 L: RetryLogic<Response = http::Response<Bytes>> + Send + 'static, 144 { 145 type SinkItem = crate::Event; 146 type SinkError = crate::Error; 147 start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError>148 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { 149 if self.slot.is_some() && self.poll_complete()?.is_not_ready() { 150 return Ok(AsyncSink::NotReady(item)); 151 } 152 assert!(self.slot.is_none(), "poll_complete did not clear slot"); 153 154 if let Some(item) = self.sink.encode_event(item) { 155 self.slot = Some(item); 156 self.poll_complete()?; 157 } 158 159 Ok(AsyncSink::Ready) 160 } 161 poll_complete(&mut self) -> Poll01<(), Self::SinkError>162 fn poll_complete(&mut self) -> Poll01<(), Self::SinkError> { 163 if let Some(item) = self.slot.take() { 164 if let AsyncSink::NotReady(item) = self.inner.start_send(item)? { 165 self.slot = Some(item); 166 return Ok(Async::NotReady); 167 } 168 } 169 170 self.inner.poll_complete() 171 } 172 } 173 174 pub struct HttpClient<B = Body> { 175 client: Client<HttpsConnector<HttpConnector<Resolver>>, B>, 176 span: Span, 177 user_agent: HeaderValue, 178 } 179 180 impl<B> HttpClient<B> 181 where 182 B: HttpBody + Send + 'static, 183 B::Data: Send, 184 B::Error: Into<crate::Error>, 185 { new( resolver: Resolver, tls_settings: impl Into<MaybeTlsSettings>, ) -> crate::Result<HttpClient<B>>186 pub fn new( 187 resolver: Resolver, 188 tls_settings: impl Into<MaybeTlsSettings>, 189 ) -> crate::Result<HttpClient<B>> { 190 let mut http = HttpConnector::new_with_resolver(resolver); 191 http.enforce_http(false); 192 193 let settings = tls_settings.into(); 194 let tls = tls_connector_builder(&settings)?; 195 let mut https = HttpsConnector::with_connector(http, tls)?; 196 197 let settings = settings.tls().cloned(); 198 https.set_callback(move |c, _uri| { 199 if let Some(settings) = &settings { 200 settings.apply_connect_configuration(c); 201 } 202 203 Ok(()) 204 }); 205 206 let client = Client::builder().build(https); 207 208 let version = crate::get_version(); 209 let user_agent = HeaderValue::from_str(&format!("Vector/{}", version)) 210 .expect("Invalid header value for version!"); 211 212 let span = tracing::info_span!("http"); 213 214 Ok(HttpClient { 215 client, 216 span, 217 user_agent, 218 }) 219 } 220 send(&mut self, request: Request<B>) -> crate::Result<http::Response<Body>>221 pub async fn send(&mut self, request: Request<B>) -> crate::Result<http::Response<Body>> { 222 self.call(request).await.map_err(Into::into) 223 } 224 } 225 226 impl<B> Service<Request<B>> for HttpClient<B> 227 where 228 B: HttpBody + Send + 'static, 229 B::Data: Send, 230 B::Error: Into<crate::Error>, 231 { 232 type Response = http::Response<Body>; 233 type Error = hyper::Error; 234 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>; 235 poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>236 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 237 Poll::Ready(Ok(())) 238 } 239 call(&mut self, mut request: Request<B>) -> Self::Future240 fn call(&mut self, mut request: Request<B>) -> Self::Future { 241 let _enter = self.span.enter(); 242 243 if !request.headers().contains_key("User-Agent") { 244 request 245 .headers_mut() 246 .insert("User-Agent", self.user_agent.clone()); 247 } 248 249 debug!(message = "sending request.", uri = %request.uri(), method = %request.method()); 250 251 let response = self.client.request(request); 252 253 let fut = async move { 254 let res = response.await?; 255 debug!( 256 message = "response.", 257 status = ?res.status(), 258 version = ?res.version(), 259 ); 260 Ok(res) 261 } 262 .instrument(self.span.clone()); 263 264 Box::pin(fut) 265 } 266 } 267 268 impl<B> Clone for HttpClient<B> { clone(&self) -> Self269 fn clone(&self) -> Self { 270 Self { 271 client: self.client.clone(), 272 span: self.span.clone(), 273 user_agent: self.user_agent.clone(), 274 } 275 } 276 } 277 278 impl<B> fmt::Debug for HttpClient<B> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result279 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 280 f.debug_struct("HttpClient") 281 .field("client", &self.client) 282 .field("user_agent", &self.user_agent) 283 .finish() 284 } 285 } 286 287 pub struct HttpBatchService<F, B = Vec<u8>> { 288 inner: HttpClient<Body>, 289 request_builder: Arc<dyn Fn(B) -> F + Send + Sync>, 290 } 291 292 impl<F, B> HttpBatchService<F, B> { new( inner: HttpClient, request_builder: impl Fn(B) -> F + Send + Sync + 'static, ) -> Self293 pub fn new( 294 inner: HttpClient, 295 request_builder: impl Fn(B) -> F + Send + Sync + 'static, 296 ) -> Self { 297 HttpBatchService { 298 inner, 299 request_builder: Arc::new(Box::new(request_builder)), 300 } 301 } 302 } 303 304 impl<F, B> Service<B> for HttpBatchService<F, B> 305 where 306 F: Future<Output = crate::Result<hyper::Request<Vec<u8>>>> + Send + 'static, 307 B: Send + 'static, 308 { 309 type Response = http::Response<Bytes>; 310 type Error = crate::Error; 311 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>; 312 poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>313 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 314 Poll::Ready(Ok(())) 315 } 316 call(&mut self, body: B) -> Self::Future317 fn call(&mut self, body: B) -> Self::Future { 318 let request_builder = Arc::clone(&self.request_builder); 319 let mut http_client = self.inner.clone(); 320 321 Box::pin(async move { 322 let request = request_builder(body).await?.map(Body::from); 323 let response = http_client.call(request).await?; 324 let (parts, body) = response.into_parts(); 325 let mut body = body::aggregate(body).await?; 326 Ok(hyper::Response::from_parts(parts, body.to_bytes())) 327 }) 328 } 329 } 330 331 impl<F, B> Clone for HttpBatchService<F, B> { clone(&self) -> Self332 fn clone(&self) -> Self { 333 Self { 334 inner: self.inner.clone(), 335 request_builder: self.request_builder.clone(), 336 } 337 } 338 } 339 340 impl<T: fmt::Debug> sink::Response for http::Response<T> { is_successful(&self) -> bool341 fn is_successful(&self) -> bool { 342 self.status().is_success() 343 } 344 } 345 346 #[derive(Clone)] 347 pub struct HttpRetryLogic; 348 349 impl RetryLogic for HttpRetryLogic { 350 type Error = hyper::Error; 351 type Response = hyper::Response<Bytes>; 352 is_retriable_error(&self, error: &Self::Error) -> bool353 fn is_retriable_error(&self, error: &Self::Error) -> bool { 354 error.is_connect() || error.is_closed() 355 } 356 should_retry_response(&self, response: &Self::Response) -> RetryAction357 fn should_retry_response(&self, response: &Self::Response) -> RetryAction { 358 let status = response.status(); 359 360 match status { 361 StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("Too many requests".into()), 362 StatusCode::NOT_IMPLEMENTED => { 363 RetryAction::DontRetry("endpoint not implemented".into()) 364 } 365 _ if status.is_server_error() => RetryAction::Retry(format!( 366 "{}: {}", 367 status, 368 String::from_utf8_lossy(response.body()) 369 )), 370 _ if status.is_success() => RetryAction::Successful, 371 _ => RetryAction::DontRetry(format!("response status: {}", status)), 372 } 373 } 374 } 375 376 #[derive(Deserialize, Serialize, Clone, Debug)] 377 #[serde(deny_unknown_fields, rename_all = "snake_case", tag = "strategy")] 378 pub enum Auth { 379 Basic { user: String, password: String }, 380 Bearer { token: String }, 381 } 382 383 impl Auth { apply<B>(&self, req: &mut Request<B>)384 pub fn apply<B>(&self, req: &mut Request<B>) { 385 use headers::{Authorization, HeaderMapExt}; 386 387 match &self { 388 Auth::Basic { user, password } => { 389 let auth = Authorization::basic(&user, &password); 390 req.headers_mut().typed_insert(auth); 391 } 392 Auth::Bearer { token } => match Authorization::bearer(&token) { 393 Ok(auth) => req.headers_mut().typed_insert(auth), 394 Err(error) => error!(message = "invalid bearer token", %token, %error), 395 }, 396 } 397 } 398 } 399 400 #[cfg(test)] 401 mod test { 402 use super::*; 403 use crate::test_util::runtime; 404 use bytes05::Buf; 405 use futures::future::ready; 406 use futures01::{Future, Stream}; 407 use hyper::service::{make_service_fn, service_fn}; 408 use hyper::{Body, Response, Server, Uri}; 409 use tower03::Service; 410 411 #[test] util_http_retry_logic()412 fn util_http_retry_logic() { 413 let logic = HttpRetryLogic; 414 415 let response_429 = Response::builder().status(429).body(Bytes::new()).unwrap(); 416 let response_500 = Response::builder().status(500).body(Bytes::new()).unwrap(); 417 let response_400 = Response::builder().status(400).body(Bytes::new()).unwrap(); 418 let response_501 = Response::builder().status(501).body(Bytes::new()).unwrap(); 419 420 assert!(logic.should_retry_response(&response_429).is_retryable()); 421 assert!(logic.should_retry_response(&response_500).is_retryable()); 422 assert!(logic 423 .should_retry_response(&response_400) 424 .is_not_retryable()); 425 assert!(logic 426 .should_retry_response(&response_501) 427 .is_not_retryable()); 428 } 429 430 #[test] util_http_it_makes_http_requests()431 fn util_http_it_makes_http_requests() { 432 let addr = crate::test_util::next_addr(); 433 let resolver = Resolver; 434 435 let uri = format!("http://{}:{}/", addr.ip(), addr.port()) 436 .parse::<Uri>() 437 .unwrap(); 438 439 let request = b"hello".to_vec(); 440 let client = HttpClient::new(resolver, None).unwrap(); 441 let mut service = HttpBatchService::new(client, move |body: Vec<u8>| { 442 Box::pin(ready(Request::post(&uri).body(body).map_err(Into::into))) 443 }); 444 445 let (tx, rx) = futures01::sync::mpsc::channel(10); 446 447 let new_service = make_service_fn(move |_| { 448 let tx = tx.clone(); 449 450 let svc = service_fn(move |req| { 451 let mut tx = tx.clone(); 452 453 async move { 454 let body = hyper::body::aggregate(req.into_body()) 455 .await 456 .map_err(|e| format!("error: {}", e))?; 457 let string = String::from_utf8(body.bytes().into()) 458 .map_err(|_| "Wasn't UTF-8".to_string())?; 459 tx.try_send(string).map_err(|_| "Send error".to_string())?; 460 461 Ok::<_, crate::Error>(Response::new(Body::from(""))) 462 } 463 }); 464 465 async move { Ok::<_, std::convert::Infallible>(svc) } 466 }); 467 468 let mut rt = runtime(); 469 470 rt.spawn_std(async move { 471 if let Err(e) = Server::bind(&addr).serve(new_service).await { 472 eprintln!("server error: {}", e); 473 } 474 }); 475 476 rt.block_on_std(async move { 477 tokio::time::delay_for(std::time::Duration::from_millis(50)).await; 478 service.call(request).await 479 }) 480 .unwrap(); 481 482 let _ = rt.shutdown_now(); 483 484 let (body, _rest) = rx.into_future().wait().unwrap(); 485 assert_eq!(body.unwrap(), "hello"); 486 } 487 } 488