1 use crate::config::*;
2 use crate::errors::*;
3 use crate::varz::*;
4 
5 #[allow(unused_imports)]
6 use futures::prelude::*;
7 use hyper::header::CONTENT_TYPE;
8 use hyper::server::conn::Http;
9 use hyper::service::service_fn;
10 use hyper::{Body, Request, Response, StatusCode};
11 use prometheus::{self, Encoder, TextEncoder};
12 use std::mem;
13 use std::sync::atomic::{AtomicU32, Ordering};
14 use std::sync::Arc;
15 use tokio::net::TcpListener;
16 use tokio::runtime::Handle;
17 
18 const METRICS_CONNECTION_TIMEOUT_SECS: u64 = 10;
19 const METRICS_MAX_CONCURRENT_CONNECTIONS: u32 = 2;
20 
handle_client_connection( req: Request<Body>, varz: Varz, path: Arc<String>, ) -> Result<Response<Body>, Error>21 async fn handle_client_connection(
22     req: Request<Body>,
23     varz: Varz,
24     path: Arc<String>,
25 ) -> Result<Response<Body>, Error> {
26     let mut buffer = vec![];
27     if req.uri().path() != path.as_str() {
28         let response = Response::builder()
29             .status(StatusCode::NOT_FOUND)
30             .body(Body::empty())?;
31         return Ok(response);
32     }
33     let StartInstant(start_instant) = varz.start_instant;
34     let uptime = start_instant.elapsed().as_secs();
35     varz.uptime.set(uptime as _);
36     let client_queries = varz.client_queries_udp.get() + varz.client_queries_tcp.get();
37     varz.client_queries.set(client_queries as _);
38     let metric_families = prometheus::gather();
39     let encoder = TextEncoder::new();
40     encoder.encode(&metric_families, &mut buffer)?;
41     let response = Response::builder()
42         .header(CONTENT_TYPE, encoder.format_type())
43         .body(buffer.into())?;
44     Ok(response)
45 }
46 
47 #[allow(unreachable_code)]
prometheus_service( varz: Varz, metrics_config: MetricsConfig, runtime_handle: Handle, ) -> Result<(), Error>48 pub async fn prometheus_service(
49     varz: Varz,
50     metrics_config: MetricsConfig,
51     runtime_handle: Handle,
52 ) -> Result<(), Error> {
53     let path = Arc::new(metrics_config.path);
54     let stream = TcpListener::bind(metrics_config.listen_addr).await?;
55     let concurrent_connections = Arc::new(AtomicU32::new(0));
56     loop {
57         let (client, _client_addr) = stream.accept().await?;
58         let count = concurrent_connections.fetch_add(1, Ordering::Relaxed);
59         if count >= METRICS_MAX_CONCURRENT_CONNECTIONS {
60             concurrent_connections.fetch_sub(1, Ordering::Relaxed);
61             mem::drop(client);
62             continue;
63         }
64         let path = path.clone();
65         let varz = varz.clone();
66         let service =
67             service_fn(move |req| handle_client_connection(req, varz.clone(), path.clone()));
68         let connection = Http::new().serve_connection(client, service);
69         let concurrent_connections = concurrent_connections.clone();
70         runtime_handle.spawn(
71             tokio::time::timeout(
72                 std::time::Duration::from_secs(METRICS_CONNECTION_TIMEOUT_SECS),
73                 connection,
74             )
75             .map(move |_| {
76                 concurrent_connections.fetch_sub(1, Ordering::Relaxed);
77             }),
78         );
79     }
80     Ok(())
81 }
82