1 use std::convert::TryInto;
2 use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
3
4 use clap::{App, Arg};
5 use futures::stream::StreamExt;
6 use hdrhistogram::Histogram;
7
8 use rdkafka::config::ClientConfig;
9 use rdkafka::consumer::{Consumer, StreamConsumer};
10 use rdkafka::message::Message;
11 use rdkafka::producer::{FutureProducer, FutureRecord};
12
13 use crate::example_utils::setup_logger;
14
15 mod example_utils;
16
17 #[tokio::main]
main()18 async fn main() {
19 let matches = App::new("Roundtrip example")
20 .version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
21 .about("Measures latency between producer and consumer")
22 .arg(
23 Arg::with_name("brokers")
24 .short("b")
25 .long("brokers")
26 .help("Broker list in kafka format")
27 .takes_value(true)
28 .default_value("localhost:9092"),
29 )
30 .arg(
31 Arg::with_name("topic")
32 .long("topic")
33 .help("topic")
34 .takes_value(true)
35 .required(true),
36 )
37 .arg(
38 Arg::with_name("log-conf")
39 .long("log-conf")
40 .help("Configure the logging format (example: 'rdkafka=trace')")
41 .takes_value(true),
42 )
43 .get_matches();
44
45 setup_logger(true, matches.value_of("log-conf"));
46
47 let brokers = matches.value_of("brokers").unwrap();
48 let topic = matches.value_of("topic").unwrap().to_owned();
49
50 let producer: FutureProducer = ClientConfig::new()
51 .set("bootstrap.servers", brokers)
52 .set("message.timeout.ms", "5000")
53 .create()
54 .expect("Producer creation error");
55
56 let consumer: StreamConsumer = ClientConfig::new()
57 .set("bootstrap.servers", brokers)
58 .set("session.timeout.ms", "6000")
59 .set("enable.auto.commit", "false")
60 .set("group.id", "rust-rdkafka-roundtrip-example")
61 .create()
62 .expect("Consumer creation failed");
63 consumer.subscribe(&[&topic]).unwrap();
64
65 tokio::spawn(async move {
66 let mut i = 0_usize;
67 loop {
68 producer
69 .send_result(
70 FutureRecord::to(&topic)
71 .key(&i.to_string())
72 .payload("dummy")
73 .timestamp(now()),
74 )
75 .unwrap()
76 .await
77 .unwrap()
78 .unwrap();
79 i += 1;
80 }
81 });
82
83 let start = Instant::now();
84 let mut stream = consumer.start();
85 let mut latencies = Histogram::<u64>::new(5).unwrap();
86 println!("Warming up for 10s...");
87 while let Some(message) = stream.next().await {
88 let message = message.unwrap();
89 let then = message.timestamp().to_millis().unwrap();
90 if start.elapsed() < Duration::from_secs(10) {
91 // Warming up.
92 } else if start.elapsed() < Duration::from_secs(20) {
93 if latencies.len() == 0 {
94 println!("Recording for 10s...");
95 }
96 latencies += (now() - then) as u64;
97 } else {
98 break;
99 }
100 }
101
102 println!("measurements: {}", latencies.len());
103 println!("mean latency: {}ms", latencies.mean());
104 println!("p50 latency: {}ms", latencies.value_at_quantile(0.50));
105 println!("p90 latency: {}ms", latencies.value_at_quantile(0.90));
106 println!("p99 latency: {}ms", latencies.value_at_quantile(0.99));
107 }
108
now() -> i64109 fn now() -> i64 {
110 SystemTime::now()
111 .duration_since(UNIX_EPOCH)
112 .unwrap()
113 .as_millis()
114 .try_into()
115 .unwrap()
116 }
117