1 //! Aliases for types defined in the auto-generated bindings.
2 
3 use std::convert::TryFrom;
4 use std::ffi::CStr;
5 use std::{error, fmt};
6 
7 use crate::bindings;
8 use crate::helpers;
9 
10 // TYPES
11 
12 /// Native rdkafka client.
13 pub type RDKafka = bindings::rd_kafka_t;
14 
15 /// Native rdkafka configuration.
16 pub type RDKafkaConf = bindings::rd_kafka_conf_t;
17 
18 /// Native rdkafka message.
19 pub type RDKafkaMessage = bindings::rd_kafka_message_t;
20 
21 /// Native rdkafka topic.
22 pub type RDKafkaTopic = bindings::rd_kafka_topic_t;
23 
24 /// Native rdkafka topic configuration.
25 pub type RDKafkaTopicConf = bindings::rd_kafka_topic_conf_t;
26 
27 /// Native rdkafka topic partition.
28 pub type RDKafkaTopicPartition = bindings::rd_kafka_topic_partition_t;
29 
30 /// Native rdkafka topic partition list.
31 pub type RDKafkaTopicPartitionList = bindings::rd_kafka_topic_partition_list_t;
32 
33 /// Native rdkafka metadata container.
34 pub type RDKafkaMetadata = bindings::rd_kafka_metadata_t;
35 
36 /// Native rdkafka topic information.
37 pub type RDKafkaMetadataTopic = bindings::rd_kafka_metadata_topic_t;
38 
39 /// Native rdkafka partition information.
40 pub type RDKafkaMetadataPartition = bindings::rd_kafka_metadata_partition_t;
41 
42 /// Native rdkafka broker information.
43 pub type RDKafkaMetadataBroker = bindings::rd_kafka_metadata_broker_t;
44 
45 /// Native rdkafka state.
46 pub type RDKafkaState = bindings::rd_kafka_s;
47 
48 /// Native rdkafka list of groups.
49 pub type RDKafkaGroupList = bindings::rd_kafka_group_list;
50 
51 /// Native rdkafka group information.
52 pub type RDKafkaGroupInfo = bindings::rd_kafka_group_info;
53 
54 /// Native rdkafka group member information.
55 pub type RDKafkaGroupMemberInfo = bindings::rd_kafka_group_member_info;
56 
57 /// Native rdkafka group member information.
58 pub type RDKafkaHeaders = bindings::rd_kafka_headers_t;
59 
60 /// Native rdkafka queue.
61 pub type RDKafkaQueue = bindings::rd_kafka_queue_t;
62 
63 /// Native rdkafka new topic object.
64 pub type RDKafkaNewTopic = bindings::rd_kafka_NewTopic_t;
65 
66 /// Native rdkafka delete topic object.
67 pub type RDKafkaDeleteTopic = bindings::rd_kafka_DeleteTopic_t;
68 
69 /// Native rdkafka new partitions object.
70 pub type RDKafkaNewPartitions = bindings::rd_kafka_NewPartitions_t;
71 
72 /// Native rdkafka config resource.
73 pub type RDKafkaConfigResource = bindings::rd_kafka_ConfigResource_t;
74 
75 /// Native rdkafka event.
76 pub type RDKafkaEvent = bindings::rd_kafka_event_t;
77 
78 /// Native rdkafka admin options.
79 pub type RDKafkaAdminOptions = bindings::rd_kafka_AdminOptions_t;
80 
81 /// Native rdkafka topic result.
82 pub type RDKafkaTopicResult = bindings::rd_kafka_topic_result_t;
83 
84 // ENUMS
85 
86 /// Client types.
87 pub use bindings::rd_kafka_type_t as RDKafkaType;
88 
89 /// Configuration result.
90 pub use bindings::rd_kafka_conf_res_t as RDKafkaConfRes;
91 
92 /// Response error.
93 pub use bindings::rd_kafka_resp_err_t as RDKafkaRespErr;
94 
95 /// Admin operation.
96 pub use bindings::rd_kafka_admin_op_t as RDKafkaAdminOp;
97 
98 /// Config resource type.
99 pub use bindings::rd_kafka_ResourceType_t as RDKafkaResourceType;
100 
101 /// Config source.
102 pub use bindings::rd_kafka_ConfigSource_t as RDKafkaConfigSource;
103 
104 // Errors enum
105 
106 /// Native rdkafka error.
107 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
108 pub enum RDKafkaError {
109     #[doc(hidden)]
110     Begin = -200,
111     /// Received message is incorrect.
112     BadMessage = -199,
113     /// Bad/unknown compression.
114     BadCompression = -198,
115     /// Broker is going away.
116     BrokerDestroy = -197,
117     /// Generic failure.
118     Fail = -196,
119     /// Broker transport failure.
120     BrokerTransportFailure = -195,
121     /// Critical system resource.
122     CriticalSystemResource = -194,
123     /// Failed to resolve broker.
124     Resolve = -193,
125     /// Produced message timed out.
126     MessageTimedOut = -192,
127     /// Reached the end of the topic+partition queue on the broker. Not really an error.
128     PartitionEOF = -191,
129     /// Permanent: Partition does not exist in cluster.
130     UnknownPartition = -190,
131     /// File or filesystem error.
132     FileSystem = -189,
133     /// Permanent: Topic does not exist in cluster.
134     UnknownTopic = -188,
135     /// All broker connections are down.
136     AllBrokersDown = -187,
137     /// Invalid argument, or invalid configuration.
138     InvalidArgument = -186,
139     /// Operation timed out.
140     OperationTimedOut = -185,
141     /// Queue is full.
142     QueueFull = -184,
143     /// ISR count < required.acks.
144     ISRInsufficient = -183,
145     /// Broker node update.
146     NodeUpdate = -182,
147     /// SSL error.
148     SSL = -181,
149     /// Waiting for coordinator to become available.
150     WaitingForCoordinator = -180,
151     /// Unknown client group.
152     UnknownGroup = -179,
153     /// Operation in progress.
154     InProgress = -178,
155     /// Previous operation in progress, wait for it to finish.
156     PreviousInProgress = -177,
157     /// This operation would interfere with an existing subscription.
158     ExistingSubscription = -176,
159     /// Assigned partitions (rebalance_cb).
160     AssignPartitions = -175,
161     /// Revoked partitions (rebalance_cb).
162     RevokePartitions = -174,
163     /// Conflicting use.
164     Conflict = -173,
165     /// Wrong state.
166     State = -172,
167     /// Unknown protocol.
168     UnknownProtocol = -171,
169     /// Not implemented.
170     NotImplemented = -170,
171     /// Authentication failure.
172     Authentication = -169,
173     /// No stored offset.
174     NoOffset = -168,
175     /// Outdated.
176     Outdated = -167,
177     /// Timed out in queue.
178     TimedOutQueue = -166,
179     /// Feature not supported by broker.
180     UnsupportedFeature = -165,
181     /// Awaiting cache update.
182     WaitCache = -164,
183     /// Operation interrupted (e.g., due to yield).
184     Interrupted = -163,
185     /// Key serialization error.
186     KeySerialization = -162,
187     /// Value serialization error.
188     ValueSerialization = -161,
189     /// Key deserialization error.
190     KeyDeserialization = -160,
191     /// Value deserialization error.
192     ValueDeserialization = -159,
193     /// Partial response.
194     Partial = -158,
195     /// Modification attempted on read-only object.
196     ReadOnly = -157,
197     /// No such entry or item not found.
198     NoEnt = -156,
199     /// Read underflow.
200     Underflow = -155,
201     /// Invalid type.
202     InvalidType = -154,
203     /// Retry operation.
204     Retry = -153,
205     /// Purged in queue.
206     PurgeQueue = -152,
207     /// Purged in flight.
208     PurgeInflight = -151,
209     /// Fatal error: see rd_kafka_fatal_error().
210     Fatal = -150,
211     /// Inconsistent state.
212     Inconsistent = -149,
213     /// Gap-less ordering would not be guaranteed if proceeding.
214     GaplessGuarantee = -148,
215     /// Maximum poll interval exceeded.
216     PollExceeded = -147,
217     /// Unknown broker.
218     UnknownBroker = -146,
219     /// Functionality not configured.
220     NotConfigured,
221     /// Instance has been fenced.
222     Fenced,
223     /// Application generated error.
224     Application,
225     #[doc(hidden)]
226     End = -100,
227     /// Unknown broker error.
228     Unknown = -1,
229     /// Success.
230     NoError = 0,
231     /// Offset out of range.
232     OffsetOutOfRange = 1,
233     /// Invalid message.
234     InvalidMessage = 2,
235     /// Unknown topic or partition.
236     UnknownTopicOrPartition = 3,
237     /// Invalid message size.
238     InvalidMessageSize = 4,
239     /// Leader not available.
240     LeaderNotAvailable = 5,
241     /// Not leader for partition.
242     NotLeaderForPartition = 6,
243     /// Request timed out.
244     RequestTimedOut = 7,
245     /// Broker not available.
246     BrokerNotAvailable = 8,
247     /// Replica not available.
248     ReplicaNotAvailable = 9,
249     /// Message size too large.
250     MessageSizeTooLarge = 10,
251     /// Stale controller epoch code.
252     StaleControllerEpoch = 11,
253     /// Offset metadata string too large.
254     OffsetMetadataTooLarge = 12,
255     /// Broker disconnected before response received.
256     NetworkException = 13,
257     /// Coordinator load in progress.
258     CoordinatorLoadInProgress = 14,
259     /// Coordinator not available.
260     CoordinatorNotAvailable = 15,
261     /// Not coordinator.
262     NotCoordinator = 16,
263     /// Invalid topic.
264     InvalidTopic = 17,
265     /// Message batch larger than configured server segment size.
266     MessageBatchTooLarge = 18,
267     /// Not enough in-sync replicas.
268     NotEnoughReplicas = 19,
269     /// Message(s) written to insufficient number of in-sync replicas.
270     NotEnoughReplicasAfterAppend = 20,
271     /// Invalid required acks value.
272     InvalidRequiredAcks = 21,
273     /// Specified group generation id is not valid.
274     IllegalGeneration = 22,
275     /// Inconsistent group protocol.
276     InconsistentGroupProtocol = 23,
277     /// Invalid group.id.
278     InvalidGroupId = 24,
279     /// Unknown member.
280     UnknownMemberId = 25,
281     /// Invalid session timeout.
282     InvalidSessionTimeout = 26,
283     /// Group rebalance in progress.
284     RebalanceInProgress = 27,
285     /// Commit offset data size is not valid.
286     InvalidCommitOffsetSize = 28,
287     /// Topic authorization failed.
288     TopicAuthorizationFailed = 29,
289     /// Group authorization failed.
290     GroupAuthorizationFailed = 30,
291     /// Cluster authorization failed.
292     ClusterAuthorizationFailed = 31,
293     /// Invalid timestamp.
294     InvalidTimestamp = 32,
295     /// Unsupported SASL mechanism.
296     UnsupportedSASLMechanism = 33,
297     /// Illegal SASL state.
298     IllegalSASLState = 34,
299     /// Unsupported version.
300     UnsupportedVersion = 35,
301     /// Topic already exists.
302     TopicAlreadyExists = 36,
303     /// Invalid number of partitions.
304     InvalidPartitions = 37,
305     /// Invalid replication factor.
306     InvalidReplicationFactor = 38,
307     /// Invalid replica assignment.
308     InvalidReplicaAssignment = 39,
309     /// Invalid config.
310     InvalidConfig = 40,
311     /// Not controller for cluster.
312     NotController = 41,
313     /// Invalid request.
314     InvalidRequest = 42,
315     /// Message format on broker does not support request.
316     UnsupportedForMessageFormat = 43,
317     /// Policy violation.
318     PolicyViolation = 44,
319     /// Broker received an out of order sequence number.
320     OutOfOrderSequenceNumber = 45,
321     /// Broker received a duplicate sequence number.
322     DuplicateSequenceNumber = 46,
323     /// Producer attempted an operation with an old epoch.
324     InvalidProducerEpoch = 47,
325     /// Producer attempted a transactional operation in an invalid state.
326     InvalidTransactionalState = 48,
327     /// Producer attempted to use a producer id which is currently assigned to
328     /// its transactional id.
329     InvalidProducerIdMapping = 49,
330     /// Transaction timeout is larger than the maxi value allowed by the
331     /// broker's max.transaction.timeout.ms.
332     InvalidTransactionTimeout = 50,
333     /// Producer attempted to update a transaction while another concurrent
334     /// operation on the same transaction was ongoing.
335     ConcurrentTransactions = 51,
336     /// Indicates that the transaction coordinator sending a WriteTxnMarker is
337     /// no longer the current coordinator for a given producer.
338     TransactionCoordinatorFenced = 52,
339     /// Transactional Id authorization failed.
340     TransactionalIdAuthorizationFailed = 53,
341     /// Security features are disabled.
342     SecurityDisabled = 54,
343     /// Operation not attempted.
344     OperationNotAttempted = 55,
345     /// Disk error when trying to access log file on the disk.
346     KafkaStorageError = 56,
347     /// The user-specified log directory is not found in the broker config.
348     LogDirNotFound = 57,
349     /// SASL Authentication failed.
350     SaslAuthenticationFailed = 58,
351     /// Unknown Producer Id.
352     UnknownProducerId = 59,
353     /// Partition reassignment is in progress.
354     ReassignmentInProgress = 60,
355     /// Delegation Token feature is not enabled.
356     DelegationTokenAuthDisabled = 61,
357     /// Delegation Token is not found on server.
358     DelegationTokenNotFound = 62,
359     /// Specified Principal is not valid Owner/Renewer.
360     DelegationTokenOwnerMismatch = 63,
361     /// Delegation Token requests are not allowed on this connection.
362     DelegationTokenRequestNotAllowed = 64,
363     /// Delegation Token authorization failed.
364     DelegationTokenAuthorizationFailed = 65,
365     /// Delegation Token is expired.
366     DelegationTokenExpired = 66,
367     /// Supplied principalType is not supported.
368     InvalidPrincipalType = 67,
369     /// The group is not empty.
370     NonEmptyGroup = 68,
371     /// The group id does not exist.
372     GroupIdNotFound = 69,
373     /// The fetch session ID was not found.
374     FetchSessionIdNotFound = 70,
375     /// The fetch session epoch is invalid.
376     InvalidFetchSessionEpoch = 71,
377     /// No matching listener.
378     ListenerNotFound = 72,
379     /// Topic deletion is disabled.
380     TopicDeletionDisabled = 73,
381     /// Leader epoch is older than broker epoch.
382     FencedLeaderEpoch = 74,
383     /// Leader epoch is newer than broker epoch.
384     UnknownLeaderEpoch = 75,
385     /// Unsupported compression type.
386     UnsupportedCompressionType = 76,
387     /// Broker epoch has changed.
388     StaleBrokerEpoch = 77,
389     /// Leader high watermark is not caught up.
390     OffsetNotAvailable = 78,
391     /// Group member needs a valid member ID.
392     MemberIdRequired = 79,
393     /// Preferred leader was not available.
394     PreferredLeaderNotAvailable = 80,
395     /// Consumer group has reached maximum size.
396     GroupMaxSizeReached = 81,
397     /// Static consumer fenced by other consumer with same group.instance.id.
398     FencedInstanceId = 82,
399     #[doc(hidden)]
400     EndAll,
401 }
402 
403 impl From<RDKafkaRespErr> for RDKafkaError {
from(err: RDKafkaRespErr) -> RDKafkaError404     fn from(err: RDKafkaRespErr) -> RDKafkaError {
405         helpers::rd_kafka_resp_err_t_to_rdkafka_error(err)
406     }
407 }
408 
409 impl fmt::Display for RDKafkaError {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result410     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
411         let description = match RDKafkaRespErr::try_from(*self as i32) {
412             Ok(err) => {
413                 let cstr = unsafe { bindings::rd_kafka_err2str(err) };
414                 unsafe { CStr::from_ptr(cstr) }
415                     .to_string_lossy()
416                     .into_owned()
417             }
418             Err(_) => "Unknown error".to_owned(),
419         };
420 
421         write!(f, "{:?} ({})", self, description)
422     }
423 }
424 
425 impl error::Error for RDKafkaError {
description(&self) -> &str426     fn description(&self) -> &str {
427         "Error from underlying rdkafka library"
428     }
429 }
430 
431 #[cfg(test)]
432 mod tests {
433     use super::*;
434 
435     #[test]
test_display_error()436     fn test_display_error() {
437         let error: RDKafkaError = RDKafkaRespErr::RD_KAFKA_RESP_ERR__PARTITION_EOF.into();
438         assert_eq!(
439             "PartitionEOF (Broker: No more messages)",
440             format!("{}", error)
441         );
442         assert_eq!("PartitionEOF", format!("{:?}", error));
443     }
444 }
445