1 //! Aliases for types defined in the auto-generated bindings. 2 3 use std::convert::TryFrom; 4 use std::ffi::CStr; 5 use std::{error, fmt}; 6 7 use crate::bindings; 8 use crate::helpers; 9 10 // TYPES 11 12 /// Native rdkafka client. 13 pub type RDKafka = bindings::rd_kafka_t; 14 15 /// Native rdkafka configuration. 16 pub type RDKafkaConf = bindings::rd_kafka_conf_t; 17 18 /// Native rdkafka message. 19 pub type RDKafkaMessage = bindings::rd_kafka_message_t; 20 21 /// Native rdkafka topic. 22 pub type RDKafkaTopic = bindings::rd_kafka_topic_t; 23 24 /// Native rdkafka topic configuration. 25 pub type RDKafkaTopicConf = bindings::rd_kafka_topic_conf_t; 26 27 /// Native rdkafka topic partition. 28 pub type RDKafkaTopicPartition = bindings::rd_kafka_topic_partition_t; 29 30 /// Native rdkafka topic partition list. 31 pub type RDKafkaTopicPartitionList = bindings::rd_kafka_topic_partition_list_t; 32 33 /// Native rdkafka metadata container. 34 pub type RDKafkaMetadata = bindings::rd_kafka_metadata_t; 35 36 /// Native rdkafka topic information. 37 pub type RDKafkaMetadataTopic = bindings::rd_kafka_metadata_topic_t; 38 39 /// Native rdkafka partition information. 40 pub type RDKafkaMetadataPartition = bindings::rd_kafka_metadata_partition_t; 41 42 /// Native rdkafka broker information. 43 pub type RDKafkaMetadataBroker = bindings::rd_kafka_metadata_broker_t; 44 45 /// Native rdkafka state. 46 pub type RDKafkaState = bindings::rd_kafka_s; 47 48 /// Native rdkafka list of groups. 49 pub type RDKafkaGroupList = bindings::rd_kafka_group_list; 50 51 /// Native rdkafka group information. 52 pub type RDKafkaGroupInfo = bindings::rd_kafka_group_info; 53 54 /// Native rdkafka group member information. 55 pub type RDKafkaGroupMemberInfo = bindings::rd_kafka_group_member_info; 56 57 /// Native rdkafka group member information. 58 pub type RDKafkaHeaders = bindings::rd_kafka_headers_t; 59 60 /// Native rdkafka queue. 61 pub type RDKafkaQueue = bindings::rd_kafka_queue_t; 62 63 /// Native rdkafka new topic object. 64 pub type RDKafkaNewTopic = bindings::rd_kafka_NewTopic_t; 65 66 /// Native rdkafka delete topic object. 67 pub type RDKafkaDeleteTopic = bindings::rd_kafka_DeleteTopic_t; 68 69 /// Native rdkafka new partitions object. 70 pub type RDKafkaNewPartitions = bindings::rd_kafka_NewPartitions_t; 71 72 /// Native rdkafka config resource. 73 pub type RDKafkaConfigResource = bindings::rd_kafka_ConfigResource_t; 74 75 /// Native rdkafka event. 76 pub type RDKafkaEvent = bindings::rd_kafka_event_t; 77 78 /// Native rdkafka admin options. 79 pub type RDKafkaAdminOptions = bindings::rd_kafka_AdminOptions_t; 80 81 /// Native rdkafka topic result. 82 pub type RDKafkaTopicResult = bindings::rd_kafka_topic_result_t; 83 84 // ENUMS 85 86 /// Client types. 87 pub use bindings::rd_kafka_type_t as RDKafkaType; 88 89 /// Configuration result. 90 pub use bindings::rd_kafka_conf_res_t as RDKafkaConfRes; 91 92 /// Response error. 93 pub use bindings::rd_kafka_resp_err_t as RDKafkaRespErr; 94 95 /// Admin operation. 96 pub use bindings::rd_kafka_admin_op_t as RDKafkaAdminOp; 97 98 /// Config resource type. 99 pub use bindings::rd_kafka_ResourceType_t as RDKafkaResourceType; 100 101 /// Config source. 102 pub use bindings::rd_kafka_ConfigSource_t as RDKafkaConfigSource; 103 104 // Errors enum 105 106 /// Native rdkafka error. 107 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 108 pub enum RDKafkaError { 109 #[doc(hidden)] 110 Begin = -200, 111 /// Received message is incorrect. 112 BadMessage = -199, 113 /// Bad/unknown compression. 114 BadCompression = -198, 115 /// Broker is going away. 116 BrokerDestroy = -197, 117 /// Generic failure. 118 Fail = -196, 119 /// Broker transport failure. 120 BrokerTransportFailure = -195, 121 /// Critical system resource. 122 CriticalSystemResource = -194, 123 /// Failed to resolve broker. 124 Resolve = -193, 125 /// Produced message timed out. 126 MessageTimedOut = -192, 127 /// Reached the end of the topic+partition queue on the broker. Not really an error. 128 PartitionEOF = -191, 129 /// Permanent: Partition does not exist in cluster. 130 UnknownPartition = -190, 131 /// File or filesystem error. 132 FileSystem = -189, 133 /// Permanent: Topic does not exist in cluster. 134 UnknownTopic = -188, 135 /// All broker connections are down. 136 AllBrokersDown = -187, 137 /// Invalid argument, or invalid configuration. 138 InvalidArgument = -186, 139 /// Operation timed out. 140 OperationTimedOut = -185, 141 /// Queue is full. 142 QueueFull = -184, 143 /// ISR count < required.acks. 144 ISRInsufficient = -183, 145 /// Broker node update. 146 NodeUpdate = -182, 147 /// SSL error. 148 SSL = -181, 149 /// Waiting for coordinator to become available. 150 WaitingForCoordinator = -180, 151 /// Unknown client group. 152 UnknownGroup = -179, 153 /// Operation in progress. 154 InProgress = -178, 155 /// Previous operation in progress, wait for it to finish. 156 PreviousInProgress = -177, 157 /// This operation would interfere with an existing subscription. 158 ExistingSubscription = -176, 159 /// Assigned partitions (rebalance_cb). 160 AssignPartitions = -175, 161 /// Revoked partitions (rebalance_cb). 162 RevokePartitions = -174, 163 /// Conflicting use. 164 Conflict = -173, 165 /// Wrong state. 166 State = -172, 167 /// Unknown protocol. 168 UnknownProtocol = -171, 169 /// Not implemented. 170 NotImplemented = -170, 171 /// Authentication failure. 172 Authentication = -169, 173 /// No stored offset. 174 NoOffset = -168, 175 /// Outdated. 176 Outdated = -167, 177 /// Timed out in queue. 178 TimedOutQueue = -166, 179 /// Feature not supported by broker. 180 UnsupportedFeature = -165, 181 /// Awaiting cache update. 182 WaitCache = -164, 183 /// Operation interrupted (e.g., due to yield). 184 Interrupted = -163, 185 /// Key serialization error. 186 KeySerialization = -162, 187 /// Value serialization error. 188 ValueSerialization = -161, 189 /// Key deserialization error. 190 KeyDeserialization = -160, 191 /// Value deserialization error. 192 ValueDeserialization = -159, 193 /// Partial response. 194 Partial = -158, 195 /// Modification attempted on read-only object. 196 ReadOnly = -157, 197 /// No such entry or item not found. 198 NoEnt = -156, 199 /// Read underflow. 200 Underflow = -155, 201 /// Invalid type. 202 InvalidType = -154, 203 /// Retry operation. 204 Retry = -153, 205 /// Purged in queue. 206 PurgeQueue = -152, 207 /// Purged in flight. 208 PurgeInflight = -151, 209 /// Fatal error: see rd_kafka_fatal_error(). 210 Fatal = -150, 211 /// Inconsistent state. 212 Inconsistent = -149, 213 /// Gap-less ordering would not be guaranteed if proceeding. 214 GaplessGuarantee = -148, 215 /// Maximum poll interval exceeded. 216 PollExceeded = -147, 217 /// Unknown broker. 218 UnknownBroker = -146, 219 /// Functionality not configured. 220 NotConfigured, 221 /// Instance has been fenced. 222 Fenced, 223 /// Application generated error. 224 Application, 225 #[doc(hidden)] 226 End = -100, 227 /// Unknown broker error. 228 Unknown = -1, 229 /// Success. 230 NoError = 0, 231 /// Offset out of range. 232 OffsetOutOfRange = 1, 233 /// Invalid message. 234 InvalidMessage = 2, 235 /// Unknown topic or partition. 236 UnknownTopicOrPartition = 3, 237 /// Invalid message size. 238 InvalidMessageSize = 4, 239 /// Leader not available. 240 LeaderNotAvailable = 5, 241 /// Not leader for partition. 242 NotLeaderForPartition = 6, 243 /// Request timed out. 244 RequestTimedOut = 7, 245 /// Broker not available. 246 BrokerNotAvailable = 8, 247 /// Replica not available. 248 ReplicaNotAvailable = 9, 249 /// Message size too large. 250 MessageSizeTooLarge = 10, 251 /// Stale controller epoch code. 252 StaleControllerEpoch = 11, 253 /// Offset metadata string too large. 254 OffsetMetadataTooLarge = 12, 255 /// Broker disconnected before response received. 256 NetworkException = 13, 257 /// Coordinator load in progress. 258 CoordinatorLoadInProgress = 14, 259 /// Coordinator not available. 260 CoordinatorNotAvailable = 15, 261 /// Not coordinator. 262 NotCoordinator = 16, 263 /// Invalid topic. 264 InvalidTopic = 17, 265 /// Message batch larger than configured server segment size. 266 MessageBatchTooLarge = 18, 267 /// Not enough in-sync replicas. 268 NotEnoughReplicas = 19, 269 /// Message(s) written to insufficient number of in-sync replicas. 270 NotEnoughReplicasAfterAppend = 20, 271 /// Invalid required acks value. 272 InvalidRequiredAcks = 21, 273 /// Specified group generation id is not valid. 274 IllegalGeneration = 22, 275 /// Inconsistent group protocol. 276 InconsistentGroupProtocol = 23, 277 /// Invalid group.id. 278 InvalidGroupId = 24, 279 /// Unknown member. 280 UnknownMemberId = 25, 281 /// Invalid session timeout. 282 InvalidSessionTimeout = 26, 283 /// Group rebalance in progress. 284 RebalanceInProgress = 27, 285 /// Commit offset data size is not valid. 286 InvalidCommitOffsetSize = 28, 287 /// Topic authorization failed. 288 TopicAuthorizationFailed = 29, 289 /// Group authorization failed. 290 GroupAuthorizationFailed = 30, 291 /// Cluster authorization failed. 292 ClusterAuthorizationFailed = 31, 293 /// Invalid timestamp. 294 InvalidTimestamp = 32, 295 /// Unsupported SASL mechanism. 296 UnsupportedSASLMechanism = 33, 297 /// Illegal SASL state. 298 IllegalSASLState = 34, 299 /// Unsupported version. 300 UnsupportedVersion = 35, 301 /// Topic already exists. 302 TopicAlreadyExists = 36, 303 /// Invalid number of partitions. 304 InvalidPartitions = 37, 305 /// Invalid replication factor. 306 InvalidReplicationFactor = 38, 307 /// Invalid replica assignment. 308 InvalidReplicaAssignment = 39, 309 /// Invalid config. 310 InvalidConfig = 40, 311 /// Not controller for cluster. 312 NotController = 41, 313 /// Invalid request. 314 InvalidRequest = 42, 315 /// Message format on broker does not support request. 316 UnsupportedForMessageFormat = 43, 317 /// Policy violation. 318 PolicyViolation = 44, 319 /// Broker received an out of order sequence number. 320 OutOfOrderSequenceNumber = 45, 321 /// Broker received a duplicate sequence number. 322 DuplicateSequenceNumber = 46, 323 /// Producer attempted an operation with an old epoch. 324 InvalidProducerEpoch = 47, 325 /// Producer attempted a transactional operation in an invalid state. 326 InvalidTransactionalState = 48, 327 /// Producer attempted to use a producer id which is currently assigned to 328 /// its transactional id. 329 InvalidProducerIdMapping = 49, 330 /// Transaction timeout is larger than the maxi value allowed by the 331 /// broker's max.transaction.timeout.ms. 332 InvalidTransactionTimeout = 50, 333 /// Producer attempted to update a transaction while another concurrent 334 /// operation on the same transaction was ongoing. 335 ConcurrentTransactions = 51, 336 /// Indicates that the transaction coordinator sending a WriteTxnMarker is 337 /// no longer the current coordinator for a given producer. 338 TransactionCoordinatorFenced = 52, 339 /// Transactional Id authorization failed. 340 TransactionalIdAuthorizationFailed = 53, 341 /// Security features are disabled. 342 SecurityDisabled = 54, 343 /// Operation not attempted. 344 OperationNotAttempted = 55, 345 /// Disk error when trying to access log file on the disk. 346 KafkaStorageError = 56, 347 /// The user-specified log directory is not found in the broker config. 348 LogDirNotFound = 57, 349 /// SASL Authentication failed. 350 SaslAuthenticationFailed = 58, 351 /// Unknown Producer Id. 352 UnknownProducerId = 59, 353 /// Partition reassignment is in progress. 354 ReassignmentInProgress = 60, 355 /// Delegation Token feature is not enabled. 356 DelegationTokenAuthDisabled = 61, 357 /// Delegation Token is not found on server. 358 DelegationTokenNotFound = 62, 359 /// Specified Principal is not valid Owner/Renewer. 360 DelegationTokenOwnerMismatch = 63, 361 /// Delegation Token requests are not allowed on this connection. 362 DelegationTokenRequestNotAllowed = 64, 363 /// Delegation Token authorization failed. 364 DelegationTokenAuthorizationFailed = 65, 365 /// Delegation Token is expired. 366 DelegationTokenExpired = 66, 367 /// Supplied principalType is not supported. 368 InvalidPrincipalType = 67, 369 /// The group is not empty. 370 NonEmptyGroup = 68, 371 /// The group id does not exist. 372 GroupIdNotFound = 69, 373 /// The fetch session ID was not found. 374 FetchSessionIdNotFound = 70, 375 /// The fetch session epoch is invalid. 376 InvalidFetchSessionEpoch = 71, 377 /// No matching listener. 378 ListenerNotFound = 72, 379 /// Topic deletion is disabled. 380 TopicDeletionDisabled = 73, 381 /// Leader epoch is older than broker epoch. 382 FencedLeaderEpoch = 74, 383 /// Leader epoch is newer than broker epoch. 384 UnknownLeaderEpoch = 75, 385 /// Unsupported compression type. 386 UnsupportedCompressionType = 76, 387 /// Broker epoch has changed. 388 StaleBrokerEpoch = 77, 389 /// Leader high watermark is not caught up. 390 OffsetNotAvailable = 78, 391 /// Group member needs a valid member ID. 392 MemberIdRequired = 79, 393 /// Preferred leader was not available. 394 PreferredLeaderNotAvailable = 80, 395 /// Consumer group has reached maximum size. 396 GroupMaxSizeReached = 81, 397 /// Static consumer fenced by other consumer with same group.instance.id. 398 FencedInstanceId = 82, 399 #[doc(hidden)] 400 EndAll, 401 } 402 403 impl From<RDKafkaRespErr> for RDKafkaError { from(err: RDKafkaRespErr) -> RDKafkaError404 fn from(err: RDKafkaRespErr) -> RDKafkaError { 405 helpers::rd_kafka_resp_err_t_to_rdkafka_error(err) 406 } 407 } 408 409 impl fmt::Display for RDKafkaError { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result410 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 411 let description = match RDKafkaRespErr::try_from(*self as i32) { 412 Ok(err) => { 413 let cstr = unsafe { bindings::rd_kafka_err2str(err) }; 414 unsafe { CStr::from_ptr(cstr) } 415 .to_string_lossy() 416 .into_owned() 417 } 418 Err(_) => "Unknown error".to_owned(), 419 }; 420 421 write!(f, "{:?} ({})", self, description) 422 } 423 } 424 425 impl error::Error for RDKafkaError { description(&self) -> &str426 fn description(&self) -> &str { 427 "Error from underlying rdkafka library" 428 } 429 } 430 431 #[cfg(test)] 432 mod tests { 433 use super::*; 434 435 #[test] test_display_error()436 fn test_display_error() { 437 let error: RDKafkaError = RDKafkaRespErr::RD_KAFKA_RESP_ERR__PARTITION_EOF.into(); 438 assert_eq!( 439 "PartitionEOF (Broker: No more messages)", 440 format!("{}", error) 441 ); 442 assert_eq!("PartitionEOF", format!("{:?}", error)); 443 } 444 } 445