1 use crate::{
2 event::{self, Event},
3 sinks::util::{
4 encoding::{EncodingConfig, EncodingConfiguration},
5 http::{Auth, BatchedHttpSink, HttpClient, HttpSink},
6 service2::TowerRequestConfig,
7 BatchConfig, BatchSettings, Buffer, Compression, UriSerde,
8 },
9 tls::{TlsOptions, TlsSettings},
10 topology::config::{DataType, SinkConfig, SinkContext, SinkDescription},
11 };
12 use futures::{FutureExt, TryFutureExt};
13 use futures01::{future, Sink};
14 use http::{
15 header::{self, HeaderName, HeaderValue},
16 Method, Request, StatusCode, Uri,
17 };
18 use hyper::Body;
19 use indexmap::IndexMap;
20 use lazy_static::lazy_static;
21 use serde::{Deserialize, Serialize};
22 use snafu::{ResultExt, Snafu};
23
24 #[derive(Debug, Snafu)]
25 enum BuildError {
26 #[snafu(display("{}: {}", source, name))]
27 InvalidHeaderName {
28 name: String,
29 source: header::InvalidHeaderName,
30 },
31 #[snafu(display("{}: {}", source, value))]
32 InvalidHeaderValue {
33 value: String,
34 source: header::InvalidHeaderValue,
35 },
36 }
37
38 #[derive(Deserialize, Serialize, Clone, Debug)]
39 #[serde(deny_unknown_fields)]
40 pub struct HttpSinkConfig {
41 pub uri: UriSerde,
42 pub method: Option<HttpMethod>,
43 pub healthcheck_uri: Option<UriSerde>,
44 pub auth: Option<Auth>,
45 pub headers: Option<IndexMap<String, String>>,
46 #[serde(default)]
47 pub compression: Compression,
48 pub encoding: EncodingConfig<Encoding>,
49 #[serde(default)]
50 pub batch: BatchConfig,
51 #[serde(default)]
52 pub request: TowerRequestConfig,
53 pub tls: Option<TlsOptions>,
54 }
55
56 #[cfg(test)]
default_config(e: Encoding) -> HttpSinkConfig57 fn default_config(e: Encoding) -> HttpSinkConfig {
58 HttpSinkConfig {
59 uri: Default::default(),
60 method: Default::default(),
61 healthcheck_uri: Default::default(),
62 auth: Default::default(),
63 headers: Default::default(),
64 compression: Default::default(),
65 batch: Default::default(),
66 encoding: e.into(),
67 request: Default::default(),
68 tls: Default::default(),
69 }
70 }
71
72 lazy_static! {
73 static ref REQUEST_DEFAULTS: TowerRequestConfig = TowerRequestConfig {
74 in_flight_limit: Some(10),
75 timeout_secs: Some(30),
76 rate_limit_num: Some(10),
77 ..Default::default()
78 };
79 }
80
81 #[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)]
82 #[serde(rename_all = "snake_case")]
83 #[derivative(Default)]
84 pub enum HttpMethod {
85 #[derivative(Default)]
86 Post,
87 Put,
88 }
89
90 #[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)]
91 #[serde(rename_all = "snake_case")]
92 pub enum Encoding {
93 Text,
94 Ndjson,
95 Json,
96 }
97
98 inventory::submit! {
99 SinkDescription::new_without_default::<HttpSinkConfig>("http")
100 }
101
102 #[typetag::serde(name = "http")]
103 impl SinkConfig for HttpSinkConfig {
build(&self, cx: SinkContext) -> crate::Result<(super::RouterSink, super::Healthcheck)>104 fn build(&self, cx: SinkContext) -> crate::Result<(super::RouterSink, super::Healthcheck)> {
105 validate_headers(&self.headers, &self.auth)?;
106 let tls = TlsSettings::from_options(&self.tls)?;
107 let client = HttpClient::new(cx.resolver(), tls)?;
108
109 let mut config = self.clone();
110 config.uri = build_uri(config.uri.clone()).into();
111
112 let compression = config.compression;
113 let batch = config.batch.use_size_as_bytes()?.get_settings_or_default(
114 BatchSettings::default()
115 .bytes(bytesize::mib(10u64))
116 .timeout(1),
117 );
118 let request = config.request.unwrap_with(&REQUEST_DEFAULTS);
119
120 let sink = BatchedHttpSink::new(
121 config,
122 Buffer::new(batch.size, compression),
123 request,
124 batch.timeout,
125 client.clone(),
126 cx.acker(),
127 )
128 .sink_map_err(|e| error!("Fatal http sink error: {}", e));
129
130 let sink = Box::new(sink);
131
132 match self.healthcheck_uri.clone() {
133 Some(healthcheck_uri) => {
134 let healthcheck = healthcheck(healthcheck_uri, self.auth.clone(), client)
135 .boxed()
136 .compat();
137 Ok((sink, Box::new(healthcheck)))
138 }
139 None => Ok((sink, Box::new(future::ok(())))),
140 }
141 }
142
input_type(&self) -> DataType143 fn input_type(&self) -> DataType {
144 DataType::Log
145 }
146
sink_type(&self) -> &'static str147 fn sink_type(&self) -> &'static str {
148 "http"
149 }
150 }
151
152 #[async_trait::async_trait]
153 impl HttpSink for HttpSinkConfig {
154 type Input = Vec<u8>;
155 type Output = Vec<u8>;
156
encode_event(&self, mut event: Event) -> Option<Self::Input>157 fn encode_event(&self, mut event: Event) -> Option<Self::Input> {
158 self.encoding.apply_rules(&mut event);
159 let event = event.into_log();
160
161 let body = match &self.encoding.codec() {
162 Encoding::Text => {
163 if let Some(v) = event.get(&event::log_schema().message_key()) {
164 let mut b = v.to_string_lossy().into_bytes();
165 b.push(b'\n');
166 b
167 } else {
168 warn!(
169 message = "Event missing the message key; Dropping event.",
170 rate_limit_secs = 30,
171 );
172 return None;
173 }
174 }
175
176 Encoding::Ndjson => {
177 let mut b = serde_json::to_vec(&event)
178 .map_err(|e| panic!("Unable to encode into JSON: {}", e))
179 .ok()?;
180 b.push(b'\n');
181 b
182 }
183
184 Encoding::Json => {
185 let mut b = serde_json::to_vec(&event)
186 .map_err(|e| panic!("Unable to encode into JSON: {}", e))
187 .ok()?;
188 b.push(b',');
189 b
190 }
191 };
192
193 Some(body)
194 }
195
build_request(&self, mut body: Self::Output) -> crate::Result<http::Request<Vec<u8>>>196 async fn build_request(&self, mut body: Self::Output) -> crate::Result<http::Request<Vec<u8>>> {
197 let method = match &self.method.clone().unwrap_or(HttpMethod::Post) {
198 HttpMethod::Post => Method::POST,
199 HttpMethod::Put => Method::PUT,
200 };
201 let uri: Uri = self.uri.clone().into();
202
203 let ct = match self.encoding.codec() {
204 Encoding::Text => "text/plain",
205 Encoding::Ndjson => "application/x-ndjson",
206 Encoding::Json => {
207 body.insert(0, b'[');
208 body.pop(); // remove trailing comma from last record
209 body.push(b']');
210 "application/json"
211 }
212 };
213
214 let mut builder = Request::builder()
215 .method(method)
216 .uri(uri)
217 .header("Content-Type", ct);
218
219 if let Some(ce) = self.compression.content_encoding() {
220 builder = builder.header("Content-Encoding", ce);
221 }
222
223 if let Some(headers) = &self.headers {
224 for (header, value) in headers.iter() {
225 builder = builder.header(header.as_str(), value.as_str());
226 }
227 }
228
229 let mut request = builder.body(body).unwrap();
230
231 if let Some(auth) = &self.auth {
232 auth.apply(&mut request);
233 }
234
235 Ok(request)
236 }
237 }
238
healthcheck( uri: UriSerde, auth: Option<Auth>, mut client: HttpClient, ) -> crate::Result<()>239 async fn healthcheck(
240 uri: UriSerde,
241 auth: Option<Auth>,
242 mut client: HttpClient,
243 ) -> crate::Result<()> {
244 let uri = build_uri(uri);
245 let mut request = Request::head(&uri).body(Body::empty()).unwrap();
246
247 if let Some(auth) = auth {
248 auth.apply(&mut request);
249 }
250
251 let response = client.send(request).await?;
252
253 match response.status() {
254 StatusCode::OK => Ok(()),
255 status => Err(super::HealthcheckError::UnexpectedStatus { status }.into()),
256 }
257 }
258
validate_headers( headers: &Option<IndexMap<String, String>>, auth: &Option<Auth>, ) -> crate::Result<()>259 fn validate_headers(
260 headers: &Option<IndexMap<String, String>>,
261 auth: &Option<Auth>,
262 ) -> crate::Result<()> {
263 if let Some(map) = headers {
264 for (name, value) in map {
265 if auth.is_some() && name.eq_ignore_ascii_case("Authorization") {
266 return Err(
267 "Authorization header can not be used with defined auth options".into(),
268 );
269 }
270
271 HeaderName::from_bytes(name.as_bytes()).with_context(|| InvalidHeaderName { name })?;
272 HeaderValue::from_bytes(value.as_bytes())
273 .with_context(|| InvalidHeaderValue { value })?;
274 }
275 }
276 Ok(())
277 }
278
build_uri(base: UriSerde) -> Uri279 fn build_uri(base: UriSerde) -> Uri {
280 let base: Uri = base.into();
281 Uri::builder()
282 .scheme(base.scheme_str().unwrap_or("http"))
283 .authority(base.authority().map(|a| a.as_str()).unwrap_or("127.0.0.1"))
284 .path_and_query(base.path_and_query().map(|pq| pq.as_str()).unwrap_or(""))
285 .build()
286 .expect("bug building uri")
287 }
288
289 #[cfg(test)]
290 mod tests {
291 use super::*;
292 use crate::{
293 assert_downcast_matches,
294 sinks::http::HttpSinkConfig,
295 sinks::util::http::HttpSink,
296 sinks::util::test::build_test_server,
297 test_util::{next_addr, random_lines_with_stream, runtime, shutdown_on_idle},
298 topology::config::SinkContext,
299 };
300 use bytes05::buf::BufExt;
301 use futures01::{Sink, Stream};
302 use headers::{Authorization, HeaderMapExt};
303 use hyper::Method;
304 use serde::Deserialize;
305 use std::io::{BufRead, BufReader};
306
307 #[test]
http_encode_event_text()308 fn http_encode_event_text() {
309 let encoding = EncodingConfig::from(Encoding::Text);
310 let event = Event::from("hello world");
311
312 let mut config = default_config(Encoding::Text);
313 config.encoding = encoding;
314 let bytes = config.encode_event(event).unwrap();
315
316 assert_eq!(bytes, Vec::from(&"hello world\n"[..]));
317 }
318
319 #[test]
http_encode_event_json()320 fn http_encode_event_json() {
321 let encoding = EncodingConfig::from(Encoding::Ndjson);
322 let event = Event::from("hello world");
323
324 let mut config = default_config(Encoding::Json);
325 config.encoding = encoding;
326 let bytes = config.encode_event(event).unwrap();
327
328 #[derive(Deserialize, Debug)]
329 #[serde(deny_unknown_fields)]
330 struct ExpectedEvent {
331 message: String,
332 timestamp: chrono::DateTime<chrono::Utc>,
333 }
334
335 let output = serde_json::from_slice::<ExpectedEvent>(&bytes[..]).unwrap();
336
337 assert_eq!(output.message, "hello world".to_string());
338 }
339
340 #[test]
http_validates_normal_headers()341 fn http_validates_normal_headers() {
342 let config = r#"
343 uri = "http://$IN_ADDR/frames"
344 encoding = "text"
345 [headers]
346 Auth = "token:thing_and-stuff"
347 X-Custom-Nonsense = "_%_{}_-_&_._`_|_~_!_#_&_$_"
348 "#;
349 let config: HttpSinkConfig = toml::from_str(&config).unwrap();
350
351 assert!(super::validate_headers(&config.headers, &None).is_ok());
352 }
353
354 #[test]
http_catches_bad_header_names()355 fn http_catches_bad_header_names() {
356 let config = r#"
357 uri = "http://$IN_ADDR/frames"
358 encoding = "text"
359 [headers]
360 "\u0001" = "bad"
361 "#;
362 let config: HttpSinkConfig = toml::from_str(&config).unwrap();
363
364 assert_downcast_matches!(
365 super::validate_headers(&config.headers, &None).unwrap_err(),
366 BuildError,
367 BuildError::InvalidHeaderName{..}
368 );
369 }
370
371 #[test]
372 #[should_panic(expected = "Authorization header can not be used with defined auth options")]
http_headers_auth_conflict()373 fn http_headers_auth_conflict() {
374 let config = r#"
375 uri = "http://$IN_ADDR/"
376 encoding = "text"
377 [headers]
378 Authorization = "Basic base64encodedstring"
379 [auth]
380 strategy = "basic"
381 user = "user"
382 password = "password"
383 "#;
384 let config: HttpSinkConfig = toml::from_str(&config).unwrap();
385
386 let cx = SinkContext::new_test();
387
388 let _ = config.build(cx).unwrap();
389 }
390
391 #[test]
http_happy_path_post()392 fn http_happy_path_post() {
393 let num_lines = 1000;
394
395 let in_addr = next_addr();
396
397 let config = r#"
398 uri = "http://$IN_ADDR/frames"
399 compression = "gzip"
400 encoding = "ndjson"
401
402 [auth]
403 strategy = "basic"
404 user = "waldo"
405 password = "hunter2"
406 "#
407 .replace("$IN_ADDR", &format!("{}", in_addr));
408 let config: HttpSinkConfig = toml::from_str(&config).unwrap();
409
410 let mut rt = runtime();
411 let cx = SinkContext::new_test();
412
413 let (sink, _) = config.build(cx).unwrap();
414 let (rx, trigger, server) = build_test_server(in_addr, &mut rt);
415
416 let (input_lines, events) = random_lines_with_stream(100, num_lines);
417 let pump = sink.send_all(events);
418
419 rt.spawn(server);
420
421 let _ = rt.block_on(pump).unwrap();
422 drop(trigger);
423
424 let output_lines = rx
425 .wait()
426 .map(Result::unwrap)
427 .map(|(parts, body)| {
428 assert_eq!(Method::POST, parts.method);
429 assert_eq!("/frames", parts.uri.path());
430 assert_eq!(
431 Some(Authorization::basic("waldo", "hunter2")),
432 parts.headers.typed_get()
433 );
434 body.reader()
435 })
436 .map(flate2::read::GzDecoder::new)
437 .map(BufReader::new)
438 .flat_map(BufRead::lines)
439 .map(Result::unwrap)
440 .map(|s| {
441 let val: serde_json::Value = serde_json::from_str(&s).unwrap();
442 val.get("message").unwrap().as_str().unwrap().to_owned()
443 })
444 .collect::<Vec<_>>();
445
446 shutdown_on_idle(rt);
447
448 assert_eq!(num_lines, output_lines.len());
449 assert_eq!(input_lines, output_lines);
450 }
451
452 #[test]
http_happy_path_put()453 fn http_happy_path_put() {
454 let num_lines = 1000;
455
456 let in_addr = next_addr();
457
458 let config = r#"
459 uri = "http://$IN_ADDR/frames"
460 method = "put"
461 compression = "gzip"
462 encoding = "ndjson"
463
464 [auth]
465 strategy = "basic"
466 user = "waldo"
467 password = "hunter2"
468 "#
469 .replace("$IN_ADDR", &format!("{}", in_addr));
470 let config: HttpSinkConfig = toml::from_str(&config).unwrap();
471
472 let mut rt = runtime();
473 let cx = SinkContext::new_test();
474
475 let (sink, _) = config.build(cx).unwrap();
476 let (rx, trigger, server) = build_test_server(in_addr, &mut rt);
477
478 let (input_lines, events) = random_lines_with_stream(100, num_lines);
479 let pump = sink.send_all(events);
480
481 rt.spawn(server);
482
483 let _ = rt.block_on(pump).unwrap();
484 drop(trigger);
485
486 let output_lines = rx
487 .wait()
488 .map(Result::unwrap)
489 .map(|(parts, body)| {
490 assert_eq!(Method::PUT, parts.method);
491 assert_eq!("/frames", parts.uri.path());
492 assert_eq!(
493 Some(Authorization::basic("waldo", "hunter2")),
494 parts.headers.typed_get()
495 );
496 body.reader()
497 })
498 .map(flate2::read::GzDecoder::new)
499 .map(BufReader::new)
500 .flat_map(BufRead::lines)
501 .map(Result::unwrap)
502 .map(|s| {
503 let val: serde_json::Value = serde_json::from_str(&s).unwrap();
504 val.get("message").unwrap().as_str().unwrap().to_owned()
505 })
506 .collect::<Vec<_>>();
507
508 shutdown_on_idle(rt);
509
510 assert_eq!(num_lines, output_lines.len());
511 assert_eq!(input_lines, output_lines);
512 }
513
514 #[test]
http_passes_custom_headers()515 fn http_passes_custom_headers() {
516 let num_lines = 1000;
517
518 let in_addr = next_addr();
519
520 let config = r#"
521 uri = "http://$IN_ADDR/frames"
522 encoding = "ndjson"
523 compression = "gzip"
524 [headers]
525 foo = "bar"
526 baz = "quux"
527 "#
528 .replace("$IN_ADDR", &format!("{}", in_addr));
529 let config: HttpSinkConfig = toml::from_str(&config).unwrap();
530
531 let mut rt = runtime();
532 let cx = SinkContext::new_test();
533
534 let (sink, _) = config.build(cx).unwrap();
535 let (rx, trigger, server) = build_test_server(in_addr, &mut rt);
536
537 let (input_lines, events) = random_lines_with_stream(100, num_lines);
538 let pump = sink.send_all(events);
539
540 rt.spawn(server);
541
542 let _ = rt.block_on(pump).unwrap();
543 drop(trigger);
544
545 let output_lines = rx
546 .wait()
547 .map(Result::unwrap)
548 .map(|(parts, body)| {
549 assert_eq!(Method::POST, parts.method);
550 assert_eq!("/frames", parts.uri.path());
551 assert_eq!(
552 Some("bar"),
553 parts.headers.get("foo").map(|v| v.to_str().unwrap())
554 );
555 assert_eq!(
556 Some("quux"),
557 parts.headers.get("baz").map(|v| v.to_str().unwrap())
558 );
559 body.reader()
560 })
561 .map(flate2::read::GzDecoder::new)
562 .map(BufReader::new)
563 .flat_map(BufRead::lines)
564 .map(Result::unwrap)
565 .map(|s| {
566 let val: serde_json::Value = serde_json::from_str(&s).unwrap();
567 val.get("message").unwrap().as_str().unwrap().to_owned()
568 })
569 .collect::<Vec<_>>();
570
571 shutdown_on_idle(rt);
572
573 assert_eq!(num_lines, output_lines.len());
574 assert_eq!(input_lines, output_lines);
575 }
576 }
577