1 //! Kafka Producer - A higher-level API for sending messages to Kafka
2 //! topics.
3 //!
4 //! This module hosts a multi-topic capable producer for a Kafka
5 //! cluster providing a convenient API for sending messages
6 //! synchronously.
7 //!
8 //! In Kafka, each message is a key/value pair where one or the other
9 //! is optional. A `Record` represents all the data necessary to
10 //! produce such a message to Kafka using the `Producer`. It
11 //! specifies the target topic and the target partition the message is
12 //! supposed to be delivered to as well as the key and the value.
13 //!
14 //! # Example
15 //! ```no_run
16 //! use std::fmt::Write;
17 //! use std::time::Duration;
18 //! use kafka::producer::{Producer, Record, RequiredAcks};
19 //!
20 //! let mut producer =
21 //! Producer::from_hosts(vec!("localhost:9092".to_owned()))
22 //! .with_ack_timeout(Duration::from_secs(1))
23 //! .with_required_acks(RequiredAcks::One)
24 //! .create()
25 //! .unwrap();
26 //!
27 //! let mut buf = String::with_capacity(2);
28 //! for i in 0..10 {
29 //! let _ = write!(&mut buf, "{}", i); // some computation of the message data to be sent
30 //! producer.send(&Record::from_value("my-topic", buf.as_bytes())).unwrap();
31 //! buf.clear();
32 //! }
33 //! ```
34 //!
35 //! In this example, when the `producer.send(..)` returns
36 //! successfully, we are guaranteed the message is delivered to Kafka
37 //! and persisted by at least one Kafka broker. However, when sending
38 //! multiple messages just like in this example, it is more efficient
39 //! to send them in batches using `Producer::send_all`.
40 //!
41 //! Since some of the `Record`s attributes are optional, convenience
42 //! methods exist to ease their creation. In this example, the call
43 //! to `Record::from_value` creates a key-less, value-only record with
44 //! an unspecified partition. The `Record` struct, however, is
45 //! intended to provide full control over its lifecycle to client
46 //! code, and, hence, is fully open. Its current constructor methods
47 //! are provided for convience only.
48 //!
49 //! Beside the target topic, key, and the value of a `Record`, client
50 //! code is allowed to specify the topic partition the message is
51 //! supposed to be delivered to. If the partition of a `Record` is
52 //! not specified - more precisely speaking if it's negative -
53 //! `Producer` will rely on its underlying `Partitioner` to find a
54 //! suitable one. A `Partitioner` implementation can be supplied by
55 //! client code at the `Producer`'s construction time and defaults to
56 //! `DefaultPartitioner`. See that for more information for its
57 //! strategy to find a partition.
58
59 // XXX 1) rethink return values for the send_all() method
60 // XXX 2) Handle recoverable errors behind the scenes through retry attempts
61
62 use std::collections::HashMap;
63 use std::fmt;
64 use std::hash::{Hasher, BuildHasher, BuildHasherDefault};
65 use std::time::Duration;
66 use client::{self, KafkaClient};
67 use error::{ErrorKind, Result};
68 use ref_slice::ref_slice;
69 use twox_hash::XxHash32;
70
71 #[cfg(feature = "security")]
72 use client::SecurityConfig;
73
74 #[cfg(not(feature = "security"))]
75 type SecurityConfig = ();
76 use client_internals::KafkaClientInternals;
77 use protocol;
78
79 // public re-exports
80 pub use client::{Compression, RequiredAcks, ProduceConfirm, ProducePartitionConfirm};
81
82 /// The default value for `Builder::with_ack_timeout`.
83 pub const DEFAULT_ACK_TIMEOUT_MILLIS: u64 = 30 * 1000;
84
85 /// The default value for `Builder::with_required_acks`.
86 pub const DEFAULT_REQUIRED_ACKS: RequiredAcks = RequiredAcks::One;
87
88 // --------------------------------------------------------------------
89
90 /// A trait used by `Producer` to obtain the bytes `Record::key` and
91 /// `Record::value` represent. This leaves the choice of the types
92 /// for `key` and `value` with the client.
93 pub trait AsBytes {
as_bytes(&self) -> &[u8]94 fn as_bytes(&self) -> &[u8];
95 }
96
97 impl AsBytes for () {
as_bytes(&self) -> &[u8]98 fn as_bytes(&self) -> &[u8] {
99 &[]
100 }
101 }
102
103 // There seems to be some compiler issue with this:
104 // impl<T: AsRef<[u8]>> AsBytes for T {
105 // fn as_bytes(&self) -> &[u8] { self.as_ref() }
106 // }
107
108 // for now we provide the impl for some standard library types
109 impl AsBytes for String {
as_bytes(&self) -> &[u8]110 fn as_bytes(&self) -> &[u8] {
111 self.as_ref()
112 }
113 }
114 impl AsBytes for Vec<u8> {
as_bytes(&self) -> &[u8]115 fn as_bytes(&self) -> &[u8] {
116 self.as_ref()
117 }
118 }
119
120 impl<'a> AsBytes for &'a [u8] {
as_bytes(&self) -> &[u8]121 fn as_bytes(&self) -> &[u8] {
122 self
123 }
124 }
125 impl<'a> AsBytes for &'a str {
as_bytes(&self) -> &[u8]126 fn as_bytes(&self) -> &[u8] {
127 str::as_bytes(self)
128 }
129 }
130
131 // --------------------------------------------------------------------
132
133 /// A structure representing a message to be sent to Kafka through the
134 /// `Producer` API. Such a message is basically a key/value pair
135 /// specifying the target topic and optionally the topic's partition.
136 pub struct Record<'a, K, V> {
137 /// Key data of this (message) record.
138 pub key: K,
139
140 /// Value data of this (message) record.
141 pub value: V,
142
143 /// Name of the topic this message is supposed to be delivered to.
144 pub topic: &'a str,
145
146 /// The partition id of the topic to deliver this message to.
147 /// This partition may be `< 0` in which case it is considered
148 /// "unspecified". A `Producer` will then typically try to derive
149 /// a partition on its own.
150 pub partition: i32,
151 }
152
153 impl<'a, K, V> Record<'a, K, V> {
154 /// Convenience function to create a new key/value record with an
155 /// "unspecified" partition - this is, a partition set to a negative
156 /// value.
157 #[inline]
from_key_value(topic: &'a str, key: K, value: V) -> Record<'a, K, V>158 pub fn from_key_value(topic: &'a str, key: K, value: V) -> Record<'a, K, V> {
159 Record {
160 key: key,
161 value: value,
162 topic: topic,
163 partition: -1,
164 }
165 }
166
167 /// Convenience method to set the partition.
168 #[inline]
with_partition(mut self, partition: i32) -> Self169 pub fn with_partition(mut self, partition: i32) -> Self {
170 self.partition = partition;
171 self
172 }
173 }
174
175 impl<'a, V> Record<'a, (), V> {
176 /// Convenience function to create a new value only record with an
177 /// "unspecified" partition - this is, a partition set to a negative
178 /// value.
179 #[inline]
from_value(topic: &'a str, value: V) -> Record<'a, (), V>180 pub fn from_value(topic: &'a str, value: V) -> Record<'a, (), V> {
181 Record {
182 key: (),
183 value: value,
184 topic: topic,
185 partition: -1,
186 }
187 }
188 }
189
190 impl<'a, K: fmt::Debug, V: fmt::Debug> fmt::Debug for Record<'a, K, V> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result191 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
192 write!(
193 f,
194 "Record {{ topic: {}, partition: {}, key: {:?}, value: {:?} }}",
195 self.topic,
196 self.partition,
197 self.key,
198 self.value
199 )
200 }
201 }
202
203 // --------------------------------------------------------------------
204
205 /// The Kafka Producer
206 ///
207 /// See module level documentation.
208 pub struct Producer<P = DefaultPartitioner> {
209 client: KafkaClient,
210 state: State<P>,
211 config: Config,
212 }
213
214 struct State<P> {
215 /// A list of available partition IDs for each topic.
216 partitions: HashMap<String, Partitions>,
217 /// The partitioner to decide how to distribute messages
218 partitioner: P,
219 }
220
221 struct Config {
222 /// The maximum time to wait for acknowledgements. See
223 /// `KafkaClient::produce_messages`.
224 ack_timeout: i32,
225 /// The number of acks to request. See
226 /// `KafkaClient::produce_messages`.
227 required_acks: i16,
228 }
229
230 impl Producer {
231 /// Starts building a new producer using the given Kafka client.
from_client(client: KafkaClient) -> Builder<DefaultPartitioner>232 pub fn from_client(client: KafkaClient) -> Builder<DefaultPartitioner> {
233 Builder::new(Some(client), Vec::new())
234 }
235
236 /// Starts building a producer bootstraping internally a new kafka
237 /// client from the given kafka hosts.
from_hosts(hosts: Vec<String>) -> Builder<DefaultPartitioner>238 pub fn from_hosts(hosts: Vec<String>) -> Builder<DefaultPartitioner> {
239 Builder::new(None, hosts)
240 }
241
242 /// Borrows the underlying kafka client.
client(&self) -> &KafkaClient243 pub fn client(&self) -> &KafkaClient {
244 &self.client
245 }
246
247 /// Destroys this producer returning the underlying kafka client.
into_client(self) -> KafkaClient248 pub fn into_client(self) -> KafkaClient {
249 self.client
250 }
251 }
252
253
254 impl<P: Partitioner> Producer<P> {
255 /// Synchronously send the specified message to Kafka.
send<'a, K, V>(&mut self, rec: &Record<'a, K, V>) -> Result<()> where K: AsBytes, V: AsBytes,256 pub fn send<'a, K, V>(&mut self, rec: &Record<'a, K, V>) -> Result<()>
257 where
258 K: AsBytes,
259 V: AsBytes,
260 {
261 let mut rs = try!(self.send_all(ref_slice(rec)));
262
263 if self.config.required_acks == 0 {
264 // ~ with no required_acks we get no response and
265 // consider the send-data request blindly as successful
266 Ok(())
267 } else {
268 assert_eq!(1, rs.len());
269 let mut produce_confirm = rs.pop().unwrap();
270
271 assert_eq!(1, produce_confirm.partition_confirms.len());
272 produce_confirm
273 .partition_confirms
274 .pop()
275 .unwrap()
276 .offset
277 .map(|_| ())
278 .map_err(|err| ErrorKind::Kafka(err).into())
279 }
280 }
281
282 /// Synchronously send all of the specified messages to Kafka. To validate
283 /// that all of the specified records have been successfully delivered,
284 /// inspection of the offsets on the returned confirms is necessary.
send_all<'a, K, V>(&mut self, recs: &[Record<'a, K, V>]) -> Result<Vec<ProduceConfirm>> where K: AsBytes, V: AsBytes,285 pub fn send_all<'a, K, V>(&mut self, recs: &[Record<'a, K, V>]) -> Result<Vec<ProduceConfirm>>
286 where
287 K: AsBytes,
288 V: AsBytes,
289 {
290 let partitioner = &mut self.state.partitioner;
291 let partitions = &self.state.partitions;
292 let client = &mut self.client;
293 let config = &self.config;
294
295 client.internal_produce_messages(
296 config.required_acks,
297 config.ack_timeout,
298 recs.into_iter().map(|r| {
299 let mut m = client::ProduceMessage {
300 key: to_option(r.key.as_bytes()),
301 value: to_option(r.value.as_bytes()),
302 topic: r.topic,
303 partition: r.partition,
304 };
305 partitioner.partition(Topics::new(partitions), &mut m);
306 m
307 }),
308 )
309 }
310 }
311
to_option(data: &[u8]) -> Option<&[u8]>312 fn to_option(data: &[u8]) -> Option<&[u8]> {
313 if data.is_empty() { None } else { Some(data) }
314 }
315
316 // --------------------------------------------------------------------
317
318 impl<P> State<P> {
new(client: &mut KafkaClient, partitioner: P) -> Result<State<P>>319 fn new(client: &mut KafkaClient, partitioner: P) -> Result<State<P>> {
320 let ts = client.topics();
321 let mut ids = HashMap::with_capacity(ts.len());
322 for t in ts {
323 let ps = t.partitions();
324 ids.insert(
325 t.name().to_owned(),
326 Partitions {
327 available_ids: ps.available_ids(),
328 num_all_partitions: ps.len() as u32,
329 },
330 );
331 }
332 Ok(State {
333 partitions: ids,
334 partitioner: partitioner,
335 })
336 }
337 }
338
339 // --------------------------------------------------------------------
340
341 /// A Kafka Producer builder easing the process of setting up various
342 /// configuration settings.
343 pub struct Builder<P = DefaultPartitioner> {
344 client: Option<KafkaClient>,
345 hosts: Vec<String>,
346 compression: Compression,
347 ack_timeout: Duration,
348 conn_idle_timeout: Duration,
349 required_acks: RequiredAcks,
350 partitioner: P,
351 security_config: Option<SecurityConfig>,
352 client_id: Option<String>,
353 }
354
355 impl Builder {
new(client: Option<KafkaClient>, hosts: Vec<String>) -> Builder<DefaultPartitioner>356 fn new(client: Option<KafkaClient>, hosts: Vec<String>) -> Builder<DefaultPartitioner> {
357 let mut b = Builder {
358 client: client,
359 hosts: hosts,
360 compression: client::DEFAULT_COMPRESSION,
361 ack_timeout: Duration::from_millis(DEFAULT_ACK_TIMEOUT_MILLIS),
362 conn_idle_timeout: Duration::from_millis(
363 client::DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS,
364 ),
365 required_acks: DEFAULT_REQUIRED_ACKS,
366 partitioner: DefaultPartitioner::default(),
367 security_config: None,
368 client_id: None,
369 };
370 if let Some(ref c) = b.client {
371 b.compression = c.compression();
372 b.conn_idle_timeout = c.connection_idle_timeout();
373 }
374 b
375 }
376
377 /// Specifies the security config to use.
378 /// See `KafkaClient::new_secure` for more info.
379 #[cfg(feature = "security")]
with_security(mut self, security: SecurityConfig) -> Self380 pub fn with_security(mut self, security: SecurityConfig) -> Self {
381 self.security_config = Some(security);
382 self
383 }
384
385 /// Sets the compression algorithm to use when sending out data.
386 ///
387 /// See `KafkaClient::set_compression`.
with_compression(mut self, compression: Compression) -> Self388 pub fn with_compression(mut self, compression: Compression) -> Self {
389 self.compression = compression;
390 self
391 }
392
393 /// Sets the maximum time the kafka brokers can await the receipt
394 /// of required acknowledgements (which is specified through
395 /// `Builder::with_required_acks`.) Note that Kafka explicitely
396 /// documents this not to be a hard limit.
with_ack_timeout(mut self, timeout: Duration) -> Self397 pub fn with_ack_timeout(mut self, timeout: Duration) -> Self {
398 self.ack_timeout = timeout;
399 self
400 }
401
402 /// Specifies the timeout for idle connections.
403 /// See `KafkaClient::set_connection_idle_timeout`.
with_connection_idle_timeout(mut self, timeout: Duration) -> Self404 pub fn with_connection_idle_timeout(mut self, timeout: Duration) -> Self {
405 self.conn_idle_timeout = timeout;
406 self
407 }
408
409 /// Sets how many acknowledgements the kafka brokers should
410 /// receive before responding to sent messages.
411 ///
412 /// See `RequiredAcks`.
with_required_acks(mut self, acks: RequiredAcks) -> Self413 pub fn with_required_acks(mut self, acks: RequiredAcks) -> Self {
414 self.required_acks = acks;
415 self
416 }
417
418 /// Specifies a client_id to be sent along every request to Kafka
419 /// brokers. See `KafkaClient::set_client_id`.
with_client_id(mut self, client_id: String) -> Self420 pub fn with_client_id(mut self, client_id: String) -> Self {
421 self.client_id = Some(client_id);
422 self
423 }
424 }
425
426 impl<P> Builder<P> {
427 /// Sets the partitioner to dispatch when sending messages without
428 /// an explicit partition assignment.
with_partitioner<Q: Partitioner>(self, partitioner: Q) -> Builder<Q>429 pub fn with_partitioner<Q: Partitioner>(self, partitioner: Q) -> Builder<Q> {
430 Builder {
431 client: self.client,
432 hosts: self.hosts,
433 compression: self.compression,
434 ack_timeout: self.ack_timeout,
435 conn_idle_timeout: self.conn_idle_timeout,
436 required_acks: self.required_acks,
437 partitioner: partitioner,
438 security_config: None,
439 client_id: None,
440 }
441 }
442
443 #[cfg(not(feature = "security"))]
new_kafka_client(hosts: Vec<String>, _: Option<SecurityConfig>) -> KafkaClient444 fn new_kafka_client(hosts: Vec<String>, _: Option<SecurityConfig>) -> KafkaClient {
445 KafkaClient::new(hosts)
446 }
447
448 #[cfg(feature = "security")]
new_kafka_client(hosts: Vec<String>, security: Option<SecurityConfig>) -> KafkaClient449 fn new_kafka_client(hosts: Vec<String>, security: Option<SecurityConfig>) -> KafkaClient {
450 if let Some(security) = security {
451 KafkaClient::new_secure(hosts, security)
452 } else {
453 KafkaClient::new(hosts)
454 }
455 }
456
457 /// Finally creates/builds a new producer based on the so far
458 /// supplied settings.
create(self) -> Result<Producer<P>>459 pub fn create(self) -> Result<Producer<P>> {
460 // ~ create the client if necessary
461 let (mut client, need_metadata) = match self.client {
462 Some(client) => (client, false),
463 None => (Self::new_kafka_client(self.hosts, self.security_config), true),
464 };
465 // ~ apply configuration settings
466 client.set_compression(self.compression);
467 client.set_connection_idle_timeout(self.conn_idle_timeout);
468 if let Some(client_id) = self.client_id {
469 client.set_client_id(client_id);
470 }
471 let producer_config = Config {
472 ack_timeout: try!(protocol::to_millis_i32(self.ack_timeout)),
473 required_acks: self.required_acks as i16,
474 };
475 // ~ load metadata if necessary
476 if need_metadata {
477 try!(client.load_metadata_all());
478 }
479 // ~ create producer state
480 let state = try!(State::new(&mut client, self.partitioner));
481 Ok(Producer {
482 client: client,
483 state: state,
484 config: producer_config,
485 })
486 }
487 }
488
489 // --------------------------------------------------------------------
490
491 /// A description of available topics and their available partitions.
492 ///
493 /// Indented for use by `Partitioner`s.
494 pub struct Topics<'a> {
495 partitions: &'a HashMap<String, Partitions>,
496 }
497
498 /// Producer relevant partition information of a particular topic.
499 ///
500 /// Indented for use by `Partition`s.
501 pub struct Partitions {
502 available_ids: Vec<i32>,
503 num_all_partitions: u32,
504 }
505
506 impl Partitions {
507 /// Retrieves the list of the identifiers of currently "available"
508 /// partitions for the given topic. This list excludes partitions
509 /// which do not have a leader broker assigned.
510 #[inline]
available_ids(&self) -> &[i32]511 pub fn available_ids(&self) -> &[i32] {
512 &self.available_ids
513 }
514
515 /// Retrieves the number of "available" partitions. This is a
516 /// merely a convenience method. See `Partitions::available_ids`.
517 #[inline]
num_available(&self) -> u32518 pub fn num_available(&self) -> u32 {
519 self.available_ids.len() as u32
520 }
521
522 /// The total number of partitions of the underlygin topic. This
523 /// number includes also partitions without a current leader
524 /// assignment.
525 #[inline]
num_all(&self) -> u32526 pub fn num_all(&self) -> u32 {
527 self.num_all_partitions
528 }
529 }
530
531 impl<'a> Topics<'a> {
new(partitions: &'a HashMap<String, Partitions>) -> Topics<'a>532 fn new(partitions: &'a HashMap<String, Partitions>) -> Topics<'a> {
533 Topics { partitions: partitions }
534 }
535
536 /// Retrieves informationa about a topic's partitions.
537 #[inline]
partitions(&'a self, topic: &str) -> Option<&'a Partitions>538 pub fn partitions(&'a self, topic: &str) -> Option<&'a Partitions> {
539 self.partitions.get(topic)
540 }
541 }
542
543 /// A partitioner is given a chance to choose/redefine a partition for
544 /// a message to be sent to Kafka. See also
545 /// `Record#with_partition`.
546 ///
547 /// Implementations can be stateful.
548 pub trait Partitioner {
549 /// Supposed to inspect the given message and if desired re-assign
550 /// the message's target partition.
551 ///
552 /// `topics` a description of the currently known topics and their
553 /// currently available partitions.
554 ///
555 /// `msg` the message whose partition assignment potentially to
556 /// change.
partition(&mut self, topics: Topics, msg: &mut client::ProduceMessage)557 fn partition(&mut self, topics: Topics, msg: &mut client::ProduceMessage);
558 }
559
560 /// The default hasher implementation used of `DefaultPartitioner`.
561 pub type DefaultHasher = XxHash32;
562
563 /// As its name implies `DefaultPartitioner` is the default
564 /// partitioner for `Producer`.
565 ///
566 /// For every message it proceedes as follows:
567 ///
568 /// - If the messages contains a non-negative partition value it
569 /// leaves the message untouched. This will cause `Producer` to try
570 /// to send the message to exactly that partition to.
571 ///
572 /// - Otherwise, if the message has an "unspecified" `partition` -
573 /// this is, it has a negative partition value - and a specified key,
574 /// `DefaultPartitioner` will compute a hash from the key using the
575 /// underlying hasher and take `hash % num_all_partitions` to derive
576 /// the partition to send the message to. This will consistently
577 /// cause messages with the same key to be sent to the same partition.
578 ///
579 /// - Otherwise - a message with an "unspecified" `partition` and no
580 /// key - `DefaultPartitioner` will "randomly" pick one from the
581 /// "available" partitions trying to distribute the messages across
582 /// the multiple partitions. In particular, it tries to distribute
583 /// such messsages across the "available" partitions in a round robin
584 /// fashion. "Available" it this context means partitions with a
585 /// known leader.
586 ///
587 /// This behavior may not suffice every workload. If your application
588 /// is dependent on a particular distribution scheme different from
589 /// the one outlined above, you want to provide your own partioner to
590 /// the `Producer` at its initialization time.
591 ///
592 /// See `Builder::with_partitioner`.
593 #[derive(Default)]
594 pub struct DefaultPartitioner<H = BuildHasherDefault<DefaultHasher>> {
595 // ~ a hasher builder; used to consistently hash keys
596 hash_builder: H,
597 // ~ a counter incremented with each partitioned message to
598 // achieve a different partition assignment for each message
599 cntr: u32,
600 }
601
602 impl DefaultPartitioner {
603 /// Creates a new partitioner which will use the given hash
604 /// builder to hash message keys.
with_hasher<B: BuildHasher>(hash_builder: B) -> DefaultPartitioner<B>605 pub fn with_hasher<B: BuildHasher>(hash_builder: B) -> DefaultPartitioner<B> {
606 DefaultPartitioner {
607 hash_builder: hash_builder.into(),
608 cntr: 0,
609 }
610 }
611
with_default_hasher<B>() -> DefaultPartitioner<BuildHasherDefault<B>> where B: Hasher + Default,612 pub fn with_default_hasher<B>() -> DefaultPartitioner<BuildHasherDefault<B>>
613 where
614 B: Hasher + Default,
615 {
616 DefaultPartitioner {
617 hash_builder: BuildHasherDefault::<B>::default(),
618 cntr: 0,
619 }
620 }
621 }
622
623 impl<H: BuildHasher> Partitioner for DefaultPartitioner<H> {
624 #[allow(unused_variables)]
partition(&mut self, topics: Topics, rec: &mut client::ProduceMessage)625 fn partition(&mut self, topics: Topics, rec: &mut client::ProduceMessage) {
626 if rec.partition >= 0 {
627 // ~ partition explicitely defined, trust the user
628 return;
629 }
630 let partitions = match topics.partitions(rec.topic) {
631 None => return, // ~ unknown topic, this is not the place to deal with it.
632 Some(partitions) => partitions,
633 };
634 match rec.key {
635 Some(key) => {
636 let num_partitions = partitions.num_all();
637 if num_partitions == 0 {
638 // ~ no partitions at all ... a rather strange
639 // topic. again, this is not the right place to
640 // deal with it.
641 return;
642 }
643 let mut h = self.hash_builder.build_hasher();
644 h.write(key);
645 // ~ unconditionally dispatch to partitions no matter
646 // whether they are currently available or not. this
647 // guarantees consistency which is the point of
648 // partitioning by key. other behaviour - if desired
649 // - can be implemented in custom, user provided
650 // partitioners.
651 let hash = h.finish() as u32;
652 // if `num_partitions == u32::MAX` this can lead to a
653 // negative partition ... such a partition count is very
654 // unlikely though
655 rec.partition = (hash % num_partitions) as i32;
656 }
657 None => {
658 // ~ no key available, determine a partition from the
659 // available ones.
660 let avail = partitions.available_ids();
661 if avail.len() > 0 {
662 rec.partition = avail[self.cntr as usize % avail.len()];
663 // ~ update internal state so that the next time we choose
664 // a different partition
665 self.cntr = self.cntr.wrapping_add(1);
666 }
667 }
668 }
669 }
670 }
671
672 // --------------------------------------------------------------------
673
674 #[cfg(test)]
675 mod default_partitioner_tests {
676 use std::hash::{Hasher, BuildHasherDefault};
677 use std::collections::HashMap;
678
679 use client;
680 use super::{DefaultPartitioner, DefaultHasher, Partitioner, Partitions, Topics};
681
topics_map(topics: Vec<(&str, Partitions)>) -> HashMap<String, Partitions>682 fn topics_map(topics: Vec<(&str, Partitions)>) -> HashMap<String, Partitions> {
683 let mut h = HashMap::new();
684 for topic in topics {
685 h.insert(topic.0.into(), topic.1);
686 }
687 h
688 }
689
assert_partitioning<P: Partitioner>( topics: &HashMap<String, Partitions>, p: &mut P, topic: &str, key: &str, ) -> i32690 fn assert_partitioning<P: Partitioner>(
691 topics: &HashMap<String, Partitions>,
692 p: &mut P,
693 topic: &str,
694 key: &str,
695 ) -> i32 {
696 let mut msg = client::ProduceMessage {
697 key: Some(key.as_bytes()),
698 value: None,
699 topic: topic,
700 partition: -1,
701 };
702 p.partition(Topics::new(topics), &mut msg);
703 let num_partitions = topics.get(topic).unwrap().num_all_partitions as i32;
704 assert!(msg.partition >= 0 && msg.partition < num_partitions);
705 msg.partition
706 }
707
708 /// Validate consistent partitioning on a message's key
709 #[test]
test_key_partitioning()710 fn test_key_partitioning() {
711 let h = topics_map(vec![
712 (
713 "foo",
714 Partitions {
715 available_ids: vec![0, 1, 4],
716 num_all_partitions: 5,
717 }
718 ),
719 (
720 "bar",
721 Partitions {
722 available_ids: vec![0, 1],
723 num_all_partitions: 2,
724 }
725 ),
726 ]);
727
728 let mut p: DefaultPartitioner<BuildHasherDefault<DefaultHasher>> = Default::default();
729
730 // ~ validate that partitioning by the same key leads to the same
731 // partition
732 let h1 = assert_partitioning(&h, &mut p, "foo", "foo-key");
733 let h2 = assert_partitioning(&h, &mut p, "foo", "foo-key");
734 assert_eq!(h1, h2);
735
736 // ~ validate that partitioning by different keys leads to
737 // different partitions (the keys are chosen such that they lead
738 // to different partitions)
739 let h3 = assert_partitioning(&h, &mut p, "foo", "foo-key");
740 let h4 = assert_partitioning(&h, &mut p, "foo", "bar-key");
741 assert!(h3 != h4);
742 }
743
744 #[derive(Default)]
745 struct MyCustomHasher(u64);
746
747 impl Hasher for MyCustomHasher {
finish(&self) -> u64748 fn finish(&self) -> u64 {
749 self.0
750 }
write(&mut self, bytes: &[u8])751 fn write(&mut self, bytes: &[u8]) {
752 self.0 = bytes[0] as u64;
753 }
754 }
755
756 /// Validate it is possible to register a custom hasher with the
757 /// default partitioner
758 #[test]
default_partitioner_with_custom_hasher_default()759 fn default_partitioner_with_custom_hasher_default() {
760 // this must compile
761 let mut p = DefaultPartitioner::with_default_hasher::<MyCustomHasher>();
762
763 let h = topics_map(vec![
764 (
765 "confirms",
766 Partitions {
767 available_ids: vec![0, 1],
768 num_all_partitions: 2,
769 }
770 ),
771 (
772 "contents",
773 Partitions {
774 available_ids: vec![0, 1, 9],
775 num_all_partitions: 10,
776 }
777 ),
778 ]);
779
780 // verify also the partitioner derives the correct partition
781 // ... this is hash modulo num_all_partitions. here it is a
782 // topic with a total of 2 partitions.
783 let p1 = assert_partitioning(&h, &mut p, "confirms", "A" /* ascii: 65 */);
784 assert_eq!(1, p1);
785
786 // here it is a topic with a total of 10 partitions
787 let p2 = assert_partitioning(&h, &mut p, "contents", "B" /* ascii: 66 */);
788 assert_eq!(6, p2);
789 }
790 }
791