1 use crate::{
2     event::{self, Event},
3     sinks::util::{
4         encoding::{EncodingConfig, EncodingConfiguration},
5         http::{Auth, BatchedHttpSink, HttpClient, HttpSink},
6         service2::TowerRequestConfig,
7         BatchConfig, BatchSettings, Buffer, Compression, UriSerde,
8     },
9     tls::{TlsOptions, TlsSettings},
10     topology::config::{DataType, SinkConfig, SinkContext, SinkDescription},
11 };
12 use futures::{FutureExt, TryFutureExt};
13 use futures01::{future, Sink};
14 use http::{
15     header::{self, HeaderName, HeaderValue},
16     Method, Request, StatusCode, Uri,
17 };
18 use hyper::Body;
19 use indexmap::IndexMap;
20 use lazy_static::lazy_static;
21 use serde::{Deserialize, Serialize};
22 use snafu::{ResultExt, Snafu};
23 
24 #[derive(Debug, Snafu)]
25 enum BuildError {
26     #[snafu(display("{}: {}", source, name))]
27     InvalidHeaderName {
28         name: String,
29         source: header::InvalidHeaderName,
30     },
31     #[snafu(display("{}: {}", source, value))]
32     InvalidHeaderValue {
33         value: String,
34         source: header::InvalidHeaderValue,
35     },
36 }
37 
38 #[derive(Deserialize, Serialize, Clone, Debug)]
39 #[serde(deny_unknown_fields)]
40 pub struct HttpSinkConfig {
41     pub uri: UriSerde,
42     pub method: Option<HttpMethod>,
43     pub healthcheck_uri: Option<UriSerde>,
44     pub auth: Option<Auth>,
45     pub headers: Option<IndexMap<String, String>>,
46     #[serde(default)]
47     pub compression: Compression,
48     pub encoding: EncodingConfig<Encoding>,
49     #[serde(default)]
50     pub batch: BatchConfig,
51     #[serde(default)]
52     pub request: TowerRequestConfig,
53     pub tls: Option<TlsOptions>,
54 }
55 
56 #[cfg(test)]
default_config(e: Encoding) -> HttpSinkConfig57 fn default_config(e: Encoding) -> HttpSinkConfig {
58     HttpSinkConfig {
59         uri: Default::default(),
60         method: Default::default(),
61         healthcheck_uri: Default::default(),
62         auth: Default::default(),
63         headers: Default::default(),
64         compression: Default::default(),
65         batch: Default::default(),
66         encoding: e.into(),
67         request: Default::default(),
68         tls: Default::default(),
69     }
70 }
71 
72 lazy_static! {
73     static ref REQUEST_DEFAULTS: TowerRequestConfig = TowerRequestConfig {
74         in_flight_limit: Some(10),
75         timeout_secs: Some(30),
76         rate_limit_num: Some(10),
77         ..Default::default()
78     };
79 }
80 
81 #[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)]
82 #[serde(rename_all = "snake_case")]
83 #[derivative(Default)]
84 pub enum HttpMethod {
85     #[derivative(Default)]
86     Post,
87     Put,
88 }
89 
90 #[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)]
91 #[serde(rename_all = "snake_case")]
92 pub enum Encoding {
93     Text,
94     Ndjson,
95     Json,
96 }
97 
98 inventory::submit! {
99     SinkDescription::new_without_default::<HttpSinkConfig>("http")
100 }
101 
102 #[typetag::serde(name = "http")]
103 impl SinkConfig for HttpSinkConfig {
build(&self, cx: SinkContext) -> crate::Result<(super::RouterSink, super::Healthcheck)>104     fn build(&self, cx: SinkContext) -> crate::Result<(super::RouterSink, super::Healthcheck)> {
105         validate_headers(&self.headers, &self.auth)?;
106         let tls = TlsSettings::from_options(&self.tls)?;
107         let client = HttpClient::new(cx.resolver(), tls)?;
108 
109         let mut config = self.clone();
110         config.uri = build_uri(config.uri.clone()).into();
111 
112         let compression = config.compression;
113         let batch = config.batch.use_size_as_bytes()?.get_settings_or_default(
114             BatchSettings::default()
115                 .bytes(bytesize::mib(10u64))
116                 .timeout(1),
117         );
118         let request = config.request.unwrap_with(&REQUEST_DEFAULTS);
119 
120         let sink = BatchedHttpSink::new(
121             config,
122             Buffer::new(batch.size, compression),
123             request,
124             batch.timeout,
125             client.clone(),
126             cx.acker(),
127         )
128         .sink_map_err(|e| error!("Fatal http sink error: {}", e));
129 
130         let sink = Box::new(sink);
131 
132         match self.healthcheck_uri.clone() {
133             Some(healthcheck_uri) => {
134                 let healthcheck = healthcheck(healthcheck_uri, self.auth.clone(), client)
135                     .boxed()
136                     .compat();
137                 Ok((sink, Box::new(healthcheck)))
138             }
139             None => Ok((sink, Box::new(future::ok(())))),
140         }
141     }
142 
input_type(&self) -> DataType143     fn input_type(&self) -> DataType {
144         DataType::Log
145     }
146 
sink_type(&self) -> &'static str147     fn sink_type(&self) -> &'static str {
148         "http"
149     }
150 }
151 
152 #[async_trait::async_trait]
153 impl HttpSink for HttpSinkConfig {
154     type Input = Vec<u8>;
155     type Output = Vec<u8>;
156 
encode_event(&self, mut event: Event) -> Option<Self::Input>157     fn encode_event(&self, mut event: Event) -> Option<Self::Input> {
158         self.encoding.apply_rules(&mut event);
159         let event = event.into_log();
160 
161         let body = match &self.encoding.codec() {
162             Encoding::Text => {
163                 if let Some(v) = event.get(&event::log_schema().message_key()) {
164                     let mut b = v.to_string_lossy().into_bytes();
165                     b.push(b'\n');
166                     b
167                 } else {
168                     warn!(
169                         message = "Event missing the message key; Dropping event.",
170                         rate_limit_secs = 30,
171                     );
172                     return None;
173                 }
174             }
175 
176             Encoding::Ndjson => {
177                 let mut b = serde_json::to_vec(&event)
178                     .map_err(|e| panic!("Unable to encode into JSON: {}", e))
179                     .ok()?;
180                 b.push(b'\n');
181                 b
182             }
183 
184             Encoding::Json => {
185                 let mut b = serde_json::to_vec(&event)
186                     .map_err(|e| panic!("Unable to encode into JSON: {}", e))
187                     .ok()?;
188                 b.push(b',');
189                 b
190             }
191         };
192 
193         Some(body)
194     }
195 
build_request(&self, mut body: Self::Output) -> crate::Result<http::Request<Vec<u8>>>196     async fn build_request(&self, mut body: Self::Output) -> crate::Result<http::Request<Vec<u8>>> {
197         let method = match &self.method.clone().unwrap_or(HttpMethod::Post) {
198             HttpMethod::Post => Method::POST,
199             HttpMethod::Put => Method::PUT,
200         };
201         let uri: Uri = self.uri.clone().into();
202 
203         let ct = match self.encoding.codec() {
204             Encoding::Text => "text/plain",
205             Encoding::Ndjson => "application/x-ndjson",
206             Encoding::Json => {
207                 body.insert(0, b'[');
208                 body.pop(); // remove trailing comma from last record
209                 body.push(b']');
210                 "application/json"
211             }
212         };
213 
214         let mut builder = Request::builder()
215             .method(method)
216             .uri(uri)
217             .header("Content-Type", ct);
218 
219         if let Some(ce) = self.compression.content_encoding() {
220             builder = builder.header("Content-Encoding", ce);
221         }
222 
223         if let Some(headers) = &self.headers {
224             for (header, value) in headers.iter() {
225                 builder = builder.header(header.as_str(), value.as_str());
226             }
227         }
228 
229         let mut request = builder.body(body).unwrap();
230 
231         if let Some(auth) = &self.auth {
232             auth.apply(&mut request);
233         }
234 
235         Ok(request)
236     }
237 }
238 
healthcheck( uri: UriSerde, auth: Option<Auth>, mut client: HttpClient, ) -> crate::Result<()>239 async fn healthcheck(
240     uri: UriSerde,
241     auth: Option<Auth>,
242     mut client: HttpClient,
243 ) -> crate::Result<()> {
244     let uri = build_uri(uri);
245     let mut request = Request::head(&uri).body(Body::empty()).unwrap();
246 
247     if let Some(auth) = auth {
248         auth.apply(&mut request);
249     }
250 
251     let response = client.send(request).await?;
252 
253     match response.status() {
254         StatusCode::OK => Ok(()),
255         status => Err(super::HealthcheckError::UnexpectedStatus { status }.into()),
256     }
257 }
258 
validate_headers( headers: &Option<IndexMap<String, String>>, auth: &Option<Auth>, ) -> crate::Result<()>259 fn validate_headers(
260     headers: &Option<IndexMap<String, String>>,
261     auth: &Option<Auth>,
262 ) -> crate::Result<()> {
263     if let Some(map) = headers {
264         for (name, value) in map {
265             if auth.is_some() && name.eq_ignore_ascii_case("Authorization") {
266                 return Err(
267                     "Authorization header can not be used with defined auth options".into(),
268                 );
269             }
270 
271             HeaderName::from_bytes(name.as_bytes()).with_context(|| InvalidHeaderName { name })?;
272             HeaderValue::from_bytes(value.as_bytes())
273                 .with_context(|| InvalidHeaderValue { value })?;
274         }
275     }
276     Ok(())
277 }
278 
build_uri(base: UriSerde) -> Uri279 fn build_uri(base: UriSerde) -> Uri {
280     let base: Uri = base.into();
281     Uri::builder()
282         .scheme(base.scheme_str().unwrap_or("http"))
283         .authority(base.authority().map(|a| a.as_str()).unwrap_or("127.0.0.1"))
284         .path_and_query(base.path_and_query().map(|pq| pq.as_str()).unwrap_or(""))
285         .build()
286         .expect("bug building uri")
287 }
288 
289 #[cfg(test)]
290 mod tests {
291     use super::*;
292     use crate::{
293         assert_downcast_matches,
294         sinks::http::HttpSinkConfig,
295         sinks::util::http::HttpSink,
296         sinks::util::test::build_test_server,
297         test_util::{next_addr, random_lines_with_stream, runtime, shutdown_on_idle},
298         topology::config::SinkContext,
299     };
300     use bytes05::buf::BufExt;
301     use futures01::{Sink, Stream};
302     use headers::{Authorization, HeaderMapExt};
303     use hyper::Method;
304     use serde::Deserialize;
305     use std::io::{BufRead, BufReader};
306 
307     #[test]
http_encode_event_text()308     fn http_encode_event_text() {
309         let encoding = EncodingConfig::from(Encoding::Text);
310         let event = Event::from("hello world");
311 
312         let mut config = default_config(Encoding::Text);
313         config.encoding = encoding;
314         let bytes = config.encode_event(event).unwrap();
315 
316         assert_eq!(bytes, Vec::from(&"hello world\n"[..]));
317     }
318 
319     #[test]
http_encode_event_json()320     fn http_encode_event_json() {
321         let encoding = EncodingConfig::from(Encoding::Ndjson);
322         let event = Event::from("hello world");
323 
324         let mut config = default_config(Encoding::Json);
325         config.encoding = encoding;
326         let bytes = config.encode_event(event).unwrap();
327 
328         #[derive(Deserialize, Debug)]
329         #[serde(deny_unknown_fields)]
330         struct ExpectedEvent {
331             message: String,
332             timestamp: chrono::DateTime<chrono::Utc>,
333         }
334 
335         let output = serde_json::from_slice::<ExpectedEvent>(&bytes[..]).unwrap();
336 
337         assert_eq!(output.message, "hello world".to_string());
338     }
339 
340     #[test]
http_validates_normal_headers()341     fn http_validates_normal_headers() {
342         let config = r#"
343         uri = "http://$IN_ADDR/frames"
344         encoding = "text"
345         [headers]
346         Auth = "token:thing_and-stuff"
347         X-Custom-Nonsense = "_%_{}_-_&_._`_|_~_!_#_&_$_"
348         "#;
349         let config: HttpSinkConfig = toml::from_str(&config).unwrap();
350 
351         assert!(super::validate_headers(&config.headers, &None).is_ok());
352     }
353 
354     #[test]
http_catches_bad_header_names()355     fn http_catches_bad_header_names() {
356         let config = r#"
357         uri = "http://$IN_ADDR/frames"
358         encoding = "text"
359         [headers]
360         "\u0001" = "bad"
361         "#;
362         let config: HttpSinkConfig = toml::from_str(&config).unwrap();
363 
364         assert_downcast_matches!(
365             super::validate_headers(&config.headers, &None).unwrap_err(),
366             BuildError,
367             BuildError::InvalidHeaderName{..}
368         );
369     }
370 
371     #[test]
372     #[should_panic(expected = "Authorization header can not be used with defined auth options")]
http_headers_auth_conflict()373     fn http_headers_auth_conflict() {
374         let config = r#"
375         uri = "http://$IN_ADDR/"
376         encoding = "text"
377         [headers]
378         Authorization = "Basic base64encodedstring"
379         [auth]
380         strategy = "basic"
381         user = "user"
382         password = "password"
383         "#;
384         let config: HttpSinkConfig = toml::from_str(&config).unwrap();
385 
386         let cx = SinkContext::new_test();
387 
388         let _ = config.build(cx).unwrap();
389     }
390 
391     #[test]
http_happy_path_post()392     fn http_happy_path_post() {
393         let num_lines = 1000;
394 
395         let in_addr = next_addr();
396 
397         let config = r#"
398         uri = "http://$IN_ADDR/frames"
399         compression = "gzip"
400         encoding = "ndjson"
401 
402         [auth]
403         strategy = "basic"
404         user = "waldo"
405         password = "hunter2"
406     "#
407         .replace("$IN_ADDR", &format!("{}", in_addr));
408         let config: HttpSinkConfig = toml::from_str(&config).unwrap();
409 
410         let mut rt = runtime();
411         let cx = SinkContext::new_test();
412 
413         let (sink, _) = config.build(cx).unwrap();
414         let (rx, trigger, server) = build_test_server(in_addr, &mut rt);
415 
416         let (input_lines, events) = random_lines_with_stream(100, num_lines);
417         let pump = sink.send_all(events);
418 
419         rt.spawn(server);
420 
421         let _ = rt.block_on(pump).unwrap();
422         drop(trigger);
423 
424         let output_lines = rx
425             .wait()
426             .map(Result::unwrap)
427             .map(|(parts, body)| {
428                 assert_eq!(Method::POST, parts.method);
429                 assert_eq!("/frames", parts.uri.path());
430                 assert_eq!(
431                     Some(Authorization::basic("waldo", "hunter2")),
432                     parts.headers.typed_get()
433                 );
434                 body.reader()
435             })
436             .map(flate2::read::GzDecoder::new)
437             .map(BufReader::new)
438             .flat_map(BufRead::lines)
439             .map(Result::unwrap)
440             .map(|s| {
441                 let val: serde_json::Value = serde_json::from_str(&s).unwrap();
442                 val.get("message").unwrap().as_str().unwrap().to_owned()
443             })
444             .collect::<Vec<_>>();
445 
446         shutdown_on_idle(rt);
447 
448         assert_eq!(num_lines, output_lines.len());
449         assert_eq!(input_lines, output_lines);
450     }
451 
452     #[test]
http_happy_path_put()453     fn http_happy_path_put() {
454         let num_lines = 1000;
455 
456         let in_addr = next_addr();
457 
458         let config = r#"
459         uri = "http://$IN_ADDR/frames"
460         method = "put"
461         compression = "gzip"
462         encoding = "ndjson"
463 
464         [auth]
465         strategy = "basic"
466         user = "waldo"
467         password = "hunter2"
468     "#
469         .replace("$IN_ADDR", &format!("{}", in_addr));
470         let config: HttpSinkConfig = toml::from_str(&config).unwrap();
471 
472         let mut rt = runtime();
473         let cx = SinkContext::new_test();
474 
475         let (sink, _) = config.build(cx).unwrap();
476         let (rx, trigger, server) = build_test_server(in_addr, &mut rt);
477 
478         let (input_lines, events) = random_lines_with_stream(100, num_lines);
479         let pump = sink.send_all(events);
480 
481         rt.spawn(server);
482 
483         let _ = rt.block_on(pump).unwrap();
484         drop(trigger);
485 
486         let output_lines = rx
487             .wait()
488             .map(Result::unwrap)
489             .map(|(parts, body)| {
490                 assert_eq!(Method::PUT, parts.method);
491                 assert_eq!("/frames", parts.uri.path());
492                 assert_eq!(
493                     Some(Authorization::basic("waldo", "hunter2")),
494                     parts.headers.typed_get()
495                 );
496                 body.reader()
497             })
498             .map(flate2::read::GzDecoder::new)
499             .map(BufReader::new)
500             .flat_map(BufRead::lines)
501             .map(Result::unwrap)
502             .map(|s| {
503                 let val: serde_json::Value = serde_json::from_str(&s).unwrap();
504                 val.get("message").unwrap().as_str().unwrap().to_owned()
505             })
506             .collect::<Vec<_>>();
507 
508         shutdown_on_idle(rt);
509 
510         assert_eq!(num_lines, output_lines.len());
511         assert_eq!(input_lines, output_lines);
512     }
513 
514     #[test]
http_passes_custom_headers()515     fn http_passes_custom_headers() {
516         let num_lines = 1000;
517 
518         let in_addr = next_addr();
519 
520         let config = r#"
521         uri = "http://$IN_ADDR/frames"
522         encoding = "ndjson"
523         compression = "gzip"
524         [headers]
525         foo = "bar"
526         baz = "quux"
527     "#
528         .replace("$IN_ADDR", &format!("{}", in_addr));
529         let config: HttpSinkConfig = toml::from_str(&config).unwrap();
530 
531         let mut rt = runtime();
532         let cx = SinkContext::new_test();
533 
534         let (sink, _) = config.build(cx).unwrap();
535         let (rx, trigger, server) = build_test_server(in_addr, &mut rt);
536 
537         let (input_lines, events) = random_lines_with_stream(100, num_lines);
538         let pump = sink.send_all(events);
539 
540         rt.spawn(server);
541 
542         let _ = rt.block_on(pump).unwrap();
543         drop(trigger);
544 
545         let output_lines = rx
546             .wait()
547             .map(Result::unwrap)
548             .map(|(parts, body)| {
549                 assert_eq!(Method::POST, parts.method);
550                 assert_eq!("/frames", parts.uri.path());
551                 assert_eq!(
552                     Some("bar"),
553                     parts.headers.get("foo").map(|v| v.to_str().unwrap())
554                 );
555                 assert_eq!(
556                     Some("quux"),
557                     parts.headers.get("baz").map(|v| v.to_str().unwrap())
558                 );
559                 body.reader()
560             })
561             .map(flate2::read::GzDecoder::new)
562             .map(BufReader::new)
563             .flat_map(BufRead::lines)
564             .map(Result::unwrap)
565             .map(|s| {
566                 let val: serde_json::Value = serde_json::from_str(&s).unwrap();
567                 val.get("message").unwrap().as_str().unwrap().to_owned()
568             })
569             .collect::<Vec<_>>();
570 
571         shutdown_on_idle(rt);
572 
573         assert_eq!(num_lines, output_lines.len());
574         assert_eq!(input_lines, output_lines);
575     }
576 }
577