1 //! Message publication 2 use futures::channel::oneshot; 3 use futures::future::try_join_all; 4 use std::collections::{BTreeMap, HashMap, VecDeque}; 5 use std::io::Write; 6 use std::sync::{Arc, Mutex}; 7 8 use crate::client::SerializeMessage; 9 use crate::connection::{Connection, SerialId}; 10 use crate::error::{ConnectionError, ProducerError}; 11 use crate::executor::Executor; 12 use crate::message::proto::{self, CommandSendReceipt, CompressionType, EncryptionKeys, Schema}; 13 use crate::message::BatchedMessage; 14 use crate::{Error, Pulsar}; 15 use futures::task::{Context, Poll}; 16 use futures::Future; 17 use tokio::macros::support::Pin; 18 19 type ProducerId = u64; 20 type ProducerName = String; 21 22 pub struct SendFuture(pub(crate) oneshot::Receiver<Result<CommandSendReceipt, Error>>); 23 24 impl Future for SendFuture { 25 type Output = Result<CommandSendReceipt, Error>; 26 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>27 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 28 match Pin::new(&mut self.0).poll(cx) { 29 Poll::Ready(Ok(r)) => Poll::Ready(r), 30 Poll::Ready(Err(_)) => Poll::Ready(Err(ProducerError::Custom( 31 "producer unexpectedly disconnected".into(), 32 ) 33 .into())), 34 Poll::Pending => Poll::Pending, 35 } 36 } 37 } 38 39 /// message data that will be sent on a topic 40 /// 41 /// generated from the [SerializeMessage] trait or [MessageBuilder] 42 /// 43 /// this is actually a subset of the fields of a message, because batching, 44 /// compression and encryption should be handled by the producer 45 #[derive(Debug, Clone, Default)] 46 pub struct Message { 47 /// serialized data 48 pub payload: Vec<u8>, 49 pub properties: HashMap<String, String>, 50 /// key to decide partition for the msg 51 pub partition_key: ::std::option::Option<String>, 52 /// Override namespace's replication 53 pub replicate_to: ::std::vec::Vec<String>, 54 /// the timestamp that this event occurs. it is typically set by applications. 55 /// if this field is omitted, `publish_time` can be used for the purpose of `event_time`. 56 pub event_time: ::std::option::Option<u64>, 57 pub schema_version: ::std::option::Option<Vec<u8>>, 58 } 59 60 /// internal message type carrying options that must be defined 61 /// by the producer 62 #[derive(Debug, Clone, Default)] 63 pub(crate) struct ProducerMessage { 64 pub payload: Vec<u8>, 65 pub properties: HashMap<String, String>, 66 ///key to decide partition for the msg 67 pub partition_key: ::std::option::Option<String>, 68 /// Override namespace's replication 69 pub replicate_to: ::std::vec::Vec<String>, 70 pub compression: ::std::option::Option<i32>, 71 pub uncompressed_size: ::std::option::Option<u32>, 72 /// Removed below checksum field from Metadata as 73 /// it should be part of send-command which keeps checksum of header + payload 74 ///optional sfixed64 checksum = 10; 75 ///differentiate single and batch message metadata 76 pub num_messages_in_batch: ::std::option::Option<i32>, 77 pub event_time: ::std::option::Option<u64>, 78 /// Contains encryption key name, encrypted key and metadata to describe the key 79 pub encryption_keys: ::std::vec::Vec<EncryptionKeys>, 80 /// Algorithm used to encrypt data key 81 pub encryption_algo: ::std::option::Option<String>, 82 /// Additional parameters required by encryption 83 pub encryption_param: ::std::option::Option<Vec<u8>>, 84 pub schema_version: ::std::option::Option<Vec<u8>>, 85 } 86 87 impl From<Message> for ProducerMessage { from(m: Message) -> Self88 fn from(m: Message) -> Self { 89 ProducerMessage { 90 payload: m.payload, 91 properties: m.properties, 92 partition_key: m.partition_key, 93 replicate_to: m.replicate_to, 94 event_time: m.event_time, 95 schema_version: m.schema_version, 96 ..Default::default() 97 } 98 } 99 } 100 101 /// Configuration options for producers 102 #[derive(Clone, Default)] 103 pub struct ProducerOptions { 104 pub encrypted: Option<bool>, 105 pub metadata: BTreeMap<String, String>, 106 pub schema: Option<Schema>, 107 pub batch_size: Option<u32>, 108 pub compression: Option<proto::CompressionType>, 109 } 110 111 /// Wrapper structure that manges multiple producers at once, creating them as needed 112 /// ```rust,no_run 113 /// use pulsar::{Pulsar, TokioExecutor}; 114 /// 115 /// # async fn test() -> Result<(), pulsar::Error> { 116 /// # let addr = "pulsar://127.0.0.1:6650"; 117 /// # let topic = "topic"; 118 /// # let message = "data".to_owned(); 119 /// let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?; 120 /// let mut producer = pulsar.producer() 121 /// .with_name("name") 122 /// .build_multi_topic(); 123 /// let send_1 = producer.send(topic, &message).await?; 124 /// let send_2 = producer.send(topic, &message).await?; 125 /// send_1.await?; 126 /// send_2.await?; 127 /// # Ok(()) 128 /// # } 129 /// ``` 130 pub struct MultiTopicProducer<Exe: Executor> { 131 client: Pulsar<Exe>, 132 producers: BTreeMap<String, Producer<Exe>>, 133 options: ProducerOptions, 134 name: Option<String>, 135 } 136 137 impl<Exe: Executor> MultiTopicProducer<Exe> { options(&self) -> &ProducerOptions138 pub fn options(&self) -> &ProducerOptions { 139 &self.options 140 } 141 topics(&self) -> Vec<String>142 pub fn topics(&self) -> Vec<String> { 143 self.producers.keys().cloned().collect() 144 } 145 close_producer<S: Into<String>>(&mut self, topic: S) -> Result<(), Error>146 pub async fn close_producer<S: Into<String>>(&mut self, topic: S) -> Result<(), Error> { 147 let partitions = self.client.lookup_partitioned_topic(topic).await?; 148 for (topic, _) in partitions { 149 self.producers.remove(&topic); 150 } 151 Ok(()) 152 } 153 send<T: SerializeMessage + Sized, S: Into<String>>( &mut self, topic: S, message: T, ) -> Result<SendFuture, Error>154 pub async fn send<T: SerializeMessage + Sized, S: Into<String>>( 155 &mut self, 156 topic: S, 157 message: T, 158 ) -> Result<SendFuture, Error> { 159 let message = T::serialize_message(message)?; 160 let topic = topic.into(); 161 if !self.producers.contains_key(&topic) { 162 let mut builder = self 163 .client 164 .producer() 165 .with_topic(&topic) 166 .with_options(self.options.clone()); 167 if let Some(name) = &self.name { 168 builder = builder.with_name(name.clone()); 169 } 170 let producer = builder 171 .build() 172 .await?; 173 self.producers.insert(topic.clone(), producer); 174 } 175 176 let producer = self.producers.get_mut(&topic).unwrap(); 177 producer.send(message).await 178 } 179 send_all<'a, 'b, T, S, I>( &mut self, topic: S, messages: I, ) -> Result<Vec<SendFuture>, Error> where 'b: 'a, T: 'b + SerializeMessage + Sized, I: IntoIterator<Item = T>, S: Into<String>,180 pub async fn send_all<'a, 'b, T, S, I>( 181 &mut self, 182 topic: S, 183 messages: I, 184 ) -> Result<Vec<SendFuture>, Error> 185 where 186 'b: 'a, 187 T: 'b + SerializeMessage + Sized, 188 I: IntoIterator<Item = T>, 189 S: Into<String>, 190 { 191 let topic = topic.into(); 192 let mut sends = Vec::new(); 193 for msg in messages { 194 sends.push(self.send(&topic, msg).await); 195 } 196 // TODO determine whether to keep this approach or go with the partial send, but more mem friendly lazy approach. 197 // serialize all messages before sending to avoid a partial send 198 if sends.iter().all(|s| s.is_ok()) { 199 Ok(sends.into_iter().map(|s| s.unwrap()).collect()) 200 } else { 201 Err(ProducerError::PartialSend(sends).into()) 202 } 203 } 204 } 205 206 pub struct Producer<Exe: Executor> { 207 inner: ProducerInner<Exe>, 208 } 209 210 impl<Exe: Executor> Producer<Exe> { builder(pulsar: &Pulsar<Exe>) -> ProducerBuilder<Exe>211 pub fn builder(pulsar: &Pulsar<Exe>) -> ProducerBuilder<Exe> { 212 ProducerBuilder::new(&pulsar) 213 } 214 topic(&self) -> &str215 pub fn topic(&self) -> &str { 216 match &self.inner { 217 ProducerInner::Single(p) => p.topic(), 218 ProducerInner::Partitioned(p) => &p.topic, 219 } 220 } 221 partitions(&self) -> Option<Vec<String>>222 pub fn partitions(&self) -> Option<Vec<String>> { 223 match &self.inner { 224 ProducerInner::Single(_) => None, 225 ProducerInner::Partitioned(p) => { 226 Some( 227 p.producers.iter() 228 .map(|p| p.topic().to_owned()) 229 .collect() 230 ) 231 } 232 } 233 } 234 options(&self) -> &ProducerOptions235 pub fn options(&self) -> &ProducerOptions { 236 match &self.inner { 237 ProducerInner::Single(p) => p.options(), 238 ProducerInner::Partitioned(p) => &p.options, 239 } 240 } 241 create_message(&mut self) -> MessageBuilder<(), Exe>242 pub fn create_message(&mut self) -> MessageBuilder<(), Exe> { 243 MessageBuilder::new(self) 244 } 245 check_connection(&self) -> Result<(), Error>246 pub async fn check_connection(&self) -> Result<(), Error> { 247 match &self.inner { 248 ProducerInner::Single(p) => p.check_connection().await, 249 ProducerInner::Partitioned(p) => { 250 try_join_all(p.producers.iter().map(|p| p.check_connection())) 251 .await 252 .map(drop) 253 } 254 } 255 } 256 257 /// Sends a message 258 /// 259 /// this function returns a `SendFuture` because the receipt can come long after 260 /// this function was called, for various reasons: 261 /// - the message was sent successfully but Pulsar did not send the receipt yet 262 /// - the producer is batching messages, so this function must return immediately, 263 /// and the receipt will come when the batched messages are actually sent 264 /// 265 /// Usage: 266 /// 267 /// ```rust,no_run 268 /// # async fn run(mut producer: pulsar::Producer<pulsar::TokioExecutor>) -> Result<(), pulsar::Error> { 269 /// let f1 = producer.send("hello").await?; 270 /// let f2 = producer.send("world").await?; 271 /// let receipt1 = f1.await?; 272 /// let receipt2 = f2.await?; 273 /// # Ok(()) 274 /// # } 275 /// ``` send<T: SerializeMessage + Sized>( &mut self, message: T, ) -> Result<SendFuture, Error>276 pub async fn send<T: SerializeMessage + Sized>( 277 &mut self, 278 message: T, 279 ) -> Result<SendFuture, Error> { 280 match &mut self.inner { 281 ProducerInner::Single(p) => p.send(message).await, 282 ProducerInner::Partitioned(p) => p.next().send(message).await, 283 } 284 } 285 send_all<T, I>(&mut self, messages: I) -> Result<Vec<SendFuture>, Error> where T: SerializeMessage, I: IntoIterator<Item = T>,286 pub async fn send_all<T, I>(&mut self, messages: I) -> Result<Vec<SendFuture>, Error> 287 where 288 T: SerializeMessage, 289 I: IntoIterator<Item = T>, 290 { 291 let producer = match &mut self.inner { 292 ProducerInner::Single(p) => p, 293 ProducerInner::Partitioned(p) => p.next(), 294 }; 295 let mut sends = Vec::new(); 296 for message in messages { 297 sends.push(producer.send(message).await); 298 } 299 if sends.iter().all(|s| s.is_ok()) { 300 Ok(sends.into_iter().map(|s| s.unwrap()).collect()) 301 } else { 302 Err(ProducerError::PartialSend(sends).into()) 303 } 304 } 305 306 /// sends the current batch of messages send_batch(&mut self) -> Result<(), Error>307 pub async fn send_batch(&mut self) -> Result<(), Error> { 308 match &mut self.inner { 309 ProducerInner::Single(p) => p.send_batch().await, 310 ProducerInner::Partitioned(p) => { 311 try_join_all(p.producers.iter_mut().map(|p| p.send_batch())) 312 .await 313 .map(drop) 314 } 315 } 316 } 317 send_raw(&mut self, message: ProducerMessage) -> Result<SendFuture, Error>318 pub(crate) async fn send_raw(&mut self, message: ProducerMessage) -> Result<SendFuture, Error> { 319 match &mut self.inner { 320 ProducerInner::Single(p) => p.send_raw(message).await, 321 ProducerInner::Partitioned(p) => p.next().send_raw(message).await, 322 } 323 } 324 } 325 326 enum ProducerInner<Exe: Executor> { 327 Single(TopicProducer<Exe>), 328 Partitioned(PartitionedProducer<Exe>), 329 } 330 331 struct PartitionedProducer<Exe: Executor> { 332 // Guaranteed to be non-empty 333 producers: VecDeque<TopicProducer<Exe>>, 334 topic: String, 335 options: ProducerOptions, 336 } 337 338 impl<Exe: Executor> PartitionedProducer<Exe> { next(&mut self) -> &mut TopicProducer<Exe>339 pub fn next(&mut self) -> &mut TopicProducer<Exe> { 340 self.producers.rotate_left(1); 341 self.producers.front_mut().unwrap() 342 } 343 } 344 345 /// a producer is used to publish messages on a topic 346 struct TopicProducer<Exe: Executor> { 347 client: Pulsar<Exe>, 348 connection: Arc<Connection>, 349 id: ProducerId, 350 name: ProducerName, 351 topic: String, 352 message_id: SerialId, 353 //putting it in a mutex because we must send multiple messages at once 354 // while we might be pushing more messages from elsewhere 355 batch: Option<Mutex<Batch>>, 356 compression: Option<proto::CompressionType>, 357 _drop_signal: oneshot::Sender<()>, 358 options: ProducerOptions, 359 } 360 361 impl<Exe: Executor> TopicProducer<Exe> { from_connection<S: Into<String>>( client: Pulsar<Exe>, connection: Arc<Connection>, topic: S, name: Option<String>, options: ProducerOptions, ) -> Result<Self, Error>362 pub(crate) async fn from_connection<S: Into<String>>( 363 client: Pulsar<Exe>, 364 connection: Arc<Connection>, 365 topic: S, 366 name: Option<String>, 367 options: ProducerOptions, 368 ) -> Result<Self, Error> { 369 let topic = topic.into(); 370 let producer_id = rand::random(); 371 let sequence_ids = SerialId::new(); 372 373 let _ = connection 374 .sender() 375 .lookup_topic(topic.clone(), false) 376 .await?; 377 378 let topic = topic.clone(); 379 let batch_size = options.batch_size; 380 let compression = options.compression; 381 382 match compression { 383 None | Some(CompressionType::None) => {} 384 Some(CompressionType::Lz4) => { 385 #[cfg(not(feature = "lz4"))] 386 return Err(Error::Custom("cannot create a producer with LZ4 compression because the 'lz4' cargo feature is not active".to_string())); 387 } 388 Some(CompressionType::Zlib) => { 389 #[cfg(not(feature = "flate2"))] 390 return Err(Error::Custom("cannot create a producer with zlib compression because the 'flate2' cargo feature is not active".to_string())); 391 } 392 Some(CompressionType::Zstd) => { 393 #[cfg(not(feature = "zstd"))] 394 return Err(Error::Custom("cannot create a producer with zstd compression because the 'zstd' cargo feature is not active".to_string())); 395 } 396 Some(CompressionType::Snappy) => { 397 #[cfg(not(feature = "snap"))] 398 return Err(Error::Custom("cannot create a producer with Snappy compression because the 'snap' cargo feature is not active".to_string())); 399 } //Some() => unimplemented!(), 400 }; 401 402 let success = connection 403 .sender() 404 .create_producer(topic.clone(), producer_id, name, options.clone()) 405 .await?; 406 407 // drop_signal will be dropped when the TopicProducer is dropped, then 408 // drop_receiver will return, and we can close the producer 409 let (_drop_signal, drop_receiver) = oneshot::channel::<()>(); 410 let conn = connection.clone(); 411 let _ = client.executor.spawn(Box::pin(async move { 412 let _res = drop_receiver.await; 413 let _ = conn.sender().close_producer(producer_id).await; 414 })); 415 416 Ok(TopicProducer { 417 client, 418 connection, 419 id: producer_id, 420 name: success.producer_name, 421 topic, 422 message_id: sequence_ids, 423 batch: batch_size.map(Batch::new).map(Mutex::new), 424 compression, 425 _drop_signal, 426 options, 427 }) 428 } 429 topic(&self) -> &str430 pub fn topic(&self) -> &str { 431 &self.topic 432 } 433 options(&self) -> &ProducerOptions434 pub fn options(&self) -> &ProducerOptions { 435 &self.options 436 } 437 check_connection(&self) -> Result<(), Error>438 pub async fn check_connection(&self) -> Result<(), Error> { 439 self.connection.sender().send_ping().await?; 440 Ok(()) 441 } 442 send<T: SerializeMessage + Sized>( &mut self, message: T, ) -> Result<SendFuture, Error>443 pub async fn send<T: SerializeMessage + Sized>( 444 &mut self, 445 message: T, 446 ) -> Result<SendFuture, Error> { 447 match T::serialize_message(message) { 448 Ok(message) => self.send_raw(message.into()).await, 449 Err(e) => Err(e), 450 } 451 } 452 send_batch(&mut self) -> Result<(), Error>453 pub async fn send_batch(&mut self) -> Result<(), Error> { 454 match self.batch.as_ref() { 455 None => Err(ProducerError::Custom("not a batching producer".to_string()).into()), 456 Some(batch) => { 457 let mut payload: Vec<u8> = Vec::new(); 458 let mut receipts = Vec::new(); 459 let message_count; 460 461 { 462 let batch = batch.lock().unwrap(); 463 let messages = batch.get_messages(); 464 message_count = messages.len(); 465 for (tx, message) in messages { 466 receipts.push(tx); 467 message.serialize(&mut payload); 468 } 469 } 470 471 if message_count == 0 { 472 return Ok(()); 473 } 474 475 let message = ProducerMessage { 476 payload, 477 num_messages_in_batch: Some(message_count as i32), 478 ..Default::default() 479 }; 480 481 trace!("sending a batched message of size {}", message_count); 482 let send_receipt = self.send_compress(message).await.map_err(|e| Arc::new(e)); 483 for resolver in receipts { 484 let _ = resolver.send( 485 send_receipt 486 .clone() 487 .map_err(|e| ProducerError::Batch(e).into()), 488 ); 489 } 490 491 Ok(()) 492 } 493 } 494 } 495 send_raw(&mut self, message: ProducerMessage) -> Result<SendFuture, Error>496 pub(crate) async fn send_raw(&mut self, message: ProducerMessage) -> Result<SendFuture, Error> { 497 let (tx, rx) = oneshot::channel(); 498 match self.batch.as_ref() { 499 None => { 500 let receipt = self.send_compress(message).await?; 501 let _ = tx.send(Ok(receipt)); 502 Ok(SendFuture(rx)) 503 } 504 Some(batch) => { 505 let mut payload: Vec<u8> = Vec::new(); 506 let mut receipts = Vec::new(); 507 let mut counter = 0i32; 508 509 { 510 let batch = batch.lock().unwrap(); 511 batch.push_back((tx, message)); 512 513 if batch.is_full() { 514 for (tx, message) in batch.get_messages() { 515 receipts.push(tx); 516 message.serialize(&mut payload); 517 counter += 1; 518 } 519 } 520 } 521 522 if counter > 0 { 523 let message = ProducerMessage { 524 payload, 525 num_messages_in_batch: Some(counter), 526 ..Default::default() 527 }; 528 529 let send_receipt = self.send_compress(message).await.map_err(|e| Arc::new(e)); 530 531 trace!("sending a batched message of size {}", counter); 532 for tx in receipts.drain(..) { 533 let _ = tx.send( 534 send_receipt 535 .clone() 536 .map_err(|e| ProducerError::Batch(e).into()), 537 ); 538 } 539 } 540 541 Ok(SendFuture(rx)) 542 } 543 } 544 } 545 send_compress( &mut self, mut message: ProducerMessage, ) -> Result<proto::CommandSendReceipt, Error>546 async fn send_compress( 547 &mut self, 548 mut message: ProducerMessage, 549 ) -> Result<proto::CommandSendReceipt, Error> { 550 let compressed_message = match self.compression { 551 None | Some(CompressionType::None) => message, 552 Some(CompressionType::Lz4) => { 553 #[cfg(not(feature = "lz4"))] 554 return unimplemented!(); 555 556 #[cfg(feature = "lz4")] 557 { 558 let v: Vec<u8> = Vec::new(); 559 let mut encoder = lz4::EncoderBuilder::new() 560 .build(v) 561 .map_err(ProducerError::Io)?; 562 encoder 563 .write(&message.payload[..]) 564 .map_err(ProducerError::Io)?; 565 let (compressed_payload, result) = encoder.finish(); 566 567 result.map_err(ProducerError::Io)?; 568 message.payload = compressed_payload; 569 message.compression = Some(1); 570 message 571 } 572 } 573 Some(CompressionType::Zlib) => { 574 #[cfg(not(feature = "flate2"))] 575 return unimplemented!(); 576 577 #[cfg(feature = "flate2")] 578 { 579 let mut e = 580 flate2::write::ZlibEncoder::new(Vec::new(), flate2::Compression::default()); 581 e.write_all(&message.payload[..]) 582 .map_err(ProducerError::Io)?; 583 let compressed_payload = e.finish().map_err(ProducerError::Io)?; 584 585 message.payload = compressed_payload; 586 message.compression = Some(2); 587 message 588 } 589 } 590 Some(CompressionType::Zstd) => { 591 #[cfg(not(feature = "zstd"))] 592 return unimplemented!(); 593 594 #[cfg(feature = "zstd")] 595 { 596 let compressed_payload = 597 zstd::encode_all(&message.payload[..], 0).map_err(ProducerError::Io)?; 598 message.compression = Some(3); 599 message.payload = compressed_payload; 600 message 601 } 602 } 603 Some(CompressionType::Snappy) => { 604 #[cfg(not(feature = "snap"))] 605 return unimplemented!(); 606 607 #[cfg(feature = "snap")] 608 { 609 let compressed_payload: Vec<u8> = Vec::new(); 610 let mut encoder = snap::write::FrameEncoder::new(compressed_payload); 611 encoder 612 .write(&message.payload[..]) 613 .map_err(ProducerError::Io)?; 614 let compressed_payload = encoder 615 .into_inner() 616 //FIXME 617 .map_err(|e| { 618 std::io::Error::new( 619 std::io::ErrorKind::Other, 620 format!("Snappy compression error: {:?}", e), 621 ) 622 }) 623 .map_err(ProducerError::Io)?; 624 625 message.payload = compressed_payload; 626 message.compression = Some(4); 627 message 628 } 629 } 630 }; 631 632 self.send_inner(compressed_message).await 633 } 634 send_inner( &mut self, message: ProducerMessage, ) -> Result<proto::CommandSendReceipt, Error>635 async fn send_inner( 636 &mut self, 637 message: ProducerMessage, 638 ) -> Result<proto::CommandSendReceipt, Error> { 639 let msg = message.clone(); 640 match self 641 .connection 642 .sender() 643 .send(self.id, self.name.clone(), self.message_id.get(), message) 644 .await 645 { 646 Ok(receipt) => return Ok(receipt), 647 Err(ConnectionError::Disconnected) => {} 648 Err(e) => { 649 error!("send_inner got error: {:?}", e); 650 return Err(ProducerError::Connection(e).into()); 651 } 652 }; 653 654 error!("send_inner disconnected"); 655 self.reconnect().await?; 656 657 match self 658 .connection 659 .sender() 660 .send(self.id, self.name.clone(), self.message_id.get(), msg) 661 .await 662 { 663 Ok(receipt) => Ok(receipt), 664 Err(e) => { 665 error!("send_inner got error: {:?}", e); 666 Err(ProducerError::Connection(e).into()) 667 } 668 } 669 } 670 reconnect(&mut self) -> Result<(), Error>671 async fn reconnect(&mut self) -> Result<(), Error> { 672 debug!("reconnecting producer for topic: {}", self.topic); 673 let broker_address = self.client.lookup_topic(&self.topic).await?; 674 let conn = self.client.manager.get_connection(&broker_address).await?; 675 676 self.connection = conn; 677 678 let topic = self.topic.clone(); 679 let batch_size = self.options.batch_size; 680 681 let _ = self 682 .connection 683 .sender() 684 .create_producer( 685 topic.clone(), 686 self.id, 687 Some(self.name.clone()), 688 self.options.clone(), 689 ) 690 .await?; 691 692 // drop_signal will be dropped when the TopicProducer is dropped, then 693 // drop_receiver will return, and we can close the producer 694 let (_drop_signal, drop_receiver) = oneshot::channel::<()>(); 695 let batch = batch_size.map(Batch::new).map(Mutex::new); 696 let conn = self.connection.clone(); 697 let producer_id = self.id; 698 let _ = self.client.executor.spawn(Box::pin(async move { 699 let _res = drop_receiver.await; 700 let _ = conn.sender().close_producer(producer_id).await; 701 })); 702 703 self.batch = batch; 704 self._drop_signal = _drop_signal; 705 706 Ok(()) 707 } 708 } 709 710 /// Helper structure to prepare a producer 711 /// 712 /// generated from [Pulsar::producer] 713 #[derive(Clone)] 714 pub struct ProducerBuilder<Exe: Executor> { 715 pulsar: Pulsar<Exe>, 716 topic: Option<String>, 717 name: Option<String>, 718 producer_options: Option<ProducerOptions>, 719 } 720 721 impl<Exe: Executor> ProducerBuilder<Exe> { new(pulsar: &Pulsar<Exe>) -> Self722 pub fn new(pulsar: &Pulsar<Exe>) -> Self { 723 ProducerBuilder { 724 pulsar: pulsar.clone(), 725 topic: None, 726 name: None, 727 producer_options: None, 728 } 729 } 730 with_topic<S: Into<String>>(mut self, topic: S) -> Self731 pub fn with_topic<S: Into<String>>(mut self, topic: S) -> Self { 732 self.topic = Some(topic.into()); 733 self 734 } 735 with_name<S: Into<String>>(mut self, name: S) -> Self736 pub fn with_name<S: Into<String>>(mut self, name: S) -> Self { 737 self.name = Some(name.into()); 738 self 739 } 740 with_options(mut self, options: ProducerOptions) -> Self741 pub fn with_options(mut self, options: ProducerOptions) -> Self { 742 self.producer_options = Some(options); 743 self 744 } 745 build(self) -> Result<Producer<Exe>, Error>746 pub async fn build(self) -> Result<Producer<Exe>, Error> { 747 let ProducerBuilder { 748 pulsar, 749 topic, 750 name, 751 producer_options, 752 } = self; 753 let topic = topic.ok_or(Error::Custom(format!("topic not set")))?; 754 let options = producer_options.unwrap_or_default(); 755 756 let producers: Vec<TopicProducer<Exe>> = try_join_all( 757 pulsar 758 .lookup_partitioned_topic(&topic) 759 .await? 760 .into_iter() 761 .map(|(topic, addr)| { 762 let name = name.clone(); 763 let options = options.clone(); 764 let pulsar = pulsar.clone(); 765 async move { 766 let conn = pulsar.manager.get_connection(&addr).await?; 767 let producer = 768 TopicProducer::from_connection(pulsar, conn, topic, name, options) 769 .await?; 770 Ok::<_, Error>(producer) 771 } 772 }), 773 ) 774 .await?; 775 776 let producer = if producers.len() == 1 { 777 ProducerInner::Single(producers.into_iter().next().unwrap()) 778 } else if producers.len() > 1 { 779 let mut producers = VecDeque::from(producers); 780 // write to topic-1 first 781 producers.rotate_right(1); 782 ProducerInner::Partitioned(PartitionedProducer { 783 producers, 784 topic, 785 options, 786 }) 787 } else { 788 return Err(Error::Custom(format!( 789 "Unexpected error: Partition lookup returned no topics for {}", 790 topic 791 ))); 792 }; 793 Ok(Producer { inner: producer }) 794 } 795 build_multi_topic(self) -> MultiTopicProducer<Exe>796 pub fn build_multi_topic(self) -> MultiTopicProducer<Exe> { 797 MultiTopicProducer { 798 client: self.pulsar, 799 producers: Default::default(), 800 options: self.producer_options.unwrap_or_default(), 801 name: self.name, 802 } 803 } 804 } 805 806 struct Batch { 807 pub length: u32, 808 // put it in a mutex because the design of Producer requires an immutable TopicProducer, 809 // so we cannot have a mutable Batch in a send_raw(&mut self, ...) 810 pub storage: Mutex< 811 VecDeque<( 812 oneshot::Sender<Result<proto::CommandSendReceipt, Error>>, 813 BatchedMessage, 814 )>, 815 >, 816 } 817 818 impl Batch { new(length: u32) -> Batch819 pub fn new(length: u32) -> Batch { 820 Batch { 821 length, 822 storage: Mutex::new(VecDeque::with_capacity(length as usize)), 823 } 824 } 825 is_full(&self) -> bool826 pub fn is_full(&self) -> bool { 827 self.storage.lock().unwrap().len() >= self.length as usize 828 } 829 push_back( &self, msg: ( oneshot::Sender<Result<proto::CommandSendReceipt, Error>>, ProducerMessage, ), )830 pub fn push_back( 831 &self, 832 msg: ( 833 oneshot::Sender<Result<proto::CommandSendReceipt, Error>>, 834 ProducerMessage, 835 ), 836 ) { 837 let (tx, message) = msg; 838 839 let properties = message 840 .properties 841 .into_iter() 842 .map(|(key, value)| proto::KeyValue { key, value }) 843 .collect(); 844 845 let batched = BatchedMessage { 846 metadata: proto::SingleMessageMetadata { 847 properties, 848 partition_key: message.partition_key, 849 payload_size: message.payload.len() as i32, 850 ..Default::default() 851 }, 852 payload: message.payload, 853 }; 854 self.storage.lock().unwrap().push_back((tx, batched)) 855 } 856 get_messages( &self, ) -> Vec<( oneshot::Sender<Result<proto::CommandSendReceipt, Error>>, BatchedMessage, )>857 pub fn get_messages( 858 &self, 859 ) -> Vec<( 860 oneshot::Sender<Result<proto::CommandSendReceipt, Error>>, 861 BatchedMessage, 862 )> { 863 self.storage.lock().unwrap().drain(..).collect() 864 } 865 } 866 867 /// Helper structure to prepare a message 868 /// 869 /// generated with [Producer::create_message] 870 pub struct MessageBuilder<'a, T, Exe: Executor> { 871 producer: &'a mut Producer<Exe>, 872 properties: HashMap<String, String>, 873 partition_key: Option<String>, 874 content: T, 875 } 876 877 impl<'a, Exe: Executor> MessageBuilder<'a, (), Exe> { new(producer: &'a mut Producer<Exe>) -> Self878 pub fn new(producer: &'a mut Producer<Exe>) -> Self { 879 MessageBuilder { 880 producer, 881 properties: HashMap::new(), 882 partition_key: None, 883 content: (), 884 } 885 } 886 } 887 888 impl<'a, T, Exe: Executor> MessageBuilder<'a, T, Exe> { with_content<C>(self, content: C) -> MessageBuilder<'a, C, Exe>889 pub fn with_content<C>(self, content: C) -> MessageBuilder<'a, C, Exe> { 890 MessageBuilder { 891 producer: self.producer, 892 properties: self.properties, 893 partition_key: self.partition_key, 894 content, 895 } 896 } 897 with_partition_key<S: Into<String>>(mut self, partition_key: S) -> Self898 pub fn with_partition_key<S: Into<String>>(mut self, partition_key: S) -> Self { 899 self.partition_key = Some(partition_key.into()); 900 self 901 } 902 with_property<S1: Into<String>, S2: Into<String>>(mut self, key: S1, value: S2) -> Self903 pub fn with_property<S1: Into<String>, S2: Into<String>>(mut self, key: S1, value: S2) -> Self { 904 self.properties.insert(key.into(), value.into()); 905 self 906 } 907 } 908 909 impl<'a, T: SerializeMessage + Sized, Exe: Executor> MessageBuilder<'a, T, Exe> { send(self) -> Result<SendFuture, Error>910 pub async fn send(self) -> Result<SendFuture, Error> { 911 let MessageBuilder { 912 producer, 913 properties, 914 partition_key, 915 content, 916 } = self; 917 918 let mut message = T::serialize_message(content)?; 919 message.properties = properties; 920 message.partition_key = partition_key; 921 producer.send_raw(message.into()).await 922 } 923 } 924