1 //! Utility functions and types.
2 
3 use std::ffi::CStr;
4 use std::fmt;
5 use std::future::Future;
6 use std::ops::Deref;
7 use std::os::raw::c_char;
8 use std::os::raw::c_void;
9 use std::ptr;
10 use std::ptr::NonNull;
11 use std::slice;
12 use std::sync::Arc;
13 use std::time::{Duration, SystemTime, UNIX_EPOCH};
14 
15 use log::trace;
16 
17 use rdkafka_sys as rdsys;
18 
19 /// Returns a tuple representing the version of `librdkafka` in hexadecimal and
20 /// string format.
get_rdkafka_version() -> (u16, String)21 pub fn get_rdkafka_version() -> (u16, String) {
22     let version_number = unsafe { rdsys::rd_kafka_version() } as u16;
23     let c_str = unsafe { CStr::from_ptr(rdsys::rd_kafka_version_str()) };
24     (version_number, c_str.to_string_lossy().into_owned())
25 }
26 
27 /// Specifies a timeout for a Kafka operation.
28 #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
29 pub enum Timeout {
30     /// Time out after the specified duration elapses.
31     After(Duration),
32     /// Block forever.
33     Never,
34 }
35 
36 impl Timeout {
37     /// Converts a timeout to Kafka's expected representation.
as_millis(&self) -> i3238     pub(crate) fn as_millis(&self) -> i32 {
39         match self {
40             Timeout::After(d) => d.as_millis() as i32,
41             Timeout::Never => -1,
42         }
43     }
44 }
45 
46 impl std::ops::SubAssign for Timeout {
sub_assign(&mut self, other: Self)47     fn sub_assign(&mut self, other: Self) {
48         match (self, other) {
49             (Timeout::After(lhs), Timeout::After(rhs)) => *lhs -= rhs,
50             (Timeout::Never, Timeout::After(_)) => (),
51             _ => panic!("subtraction of Timeout::Never is ill-defined"),
52         }
53     }
54 }
55 
56 impl From<Duration> for Timeout {
from(d: Duration) -> Timeout57     fn from(d: Duration) -> Timeout {
58         Timeout::After(d)
59     }
60 }
61 
62 impl From<Option<Duration>> for Timeout {
from(v: Option<Duration>) -> Timeout63     fn from(v: Option<Duration>) -> Timeout {
64         match v {
65             None => Timeout::Never,
66             Some(d) => Timeout::After(d),
67         }
68     }
69 }
70 
71 /// Converts the given time to the number of milliseconds since the Unix epoch.
millis_to_epoch(time: SystemTime) -> i6472 pub fn millis_to_epoch(time: SystemTime) -> i64 {
73     time.duration_since(UNIX_EPOCH)
74         .unwrap_or_else(|_| Duration::from_secs(0))
75         .as_millis() as i64
76 }
77 
78 /// Returns the current time in milliseconds since the Unix epoch.
current_time_millis() -> i6479 pub fn current_time_millis() -> i64 {
80     millis_to_epoch(SystemTime::now())
81 }
82 
83 /// Converts a pointer to an array to an optional slice. If the pointer is null,
84 /// returns `None`.
ptr_to_opt_slice<'a, T>(ptr: *const c_void, size: usize) -> Option<&'a [T]>85 pub(crate) unsafe fn ptr_to_opt_slice<'a, T>(ptr: *const c_void, size: usize) -> Option<&'a [T]> {
86     if ptr.is_null() {
87         None
88     } else {
89         Some(slice::from_raw_parts::<T>(ptr as *const T, size))
90     }
91 }
92 
93 /// Converts a pointer to an array to a slice. If the pointer is null or the
94 /// size is zero, returns a zero-length slice..
ptr_to_slice<'a, T>(ptr: *const c_void, size: usize) -> &'a [T]95 pub(crate) unsafe fn ptr_to_slice<'a, T>(ptr: *const c_void, size: usize) -> &'a [T] {
96     if ptr.is_null() || size == 0 {
97         &[][..]
98     } else {
99         slice::from_raw_parts::<T>(ptr as *const T, size)
100     }
101 }
102 
103 /// Converts Rust data to and from raw pointers.
104 ///
105 /// This conversion is used to pass opaque objects to the C library and vice
106 /// versa.
107 pub trait IntoOpaque: Send + Sync {
108     /// Converts the object into a raw pointer.
as_ptr(&self) -> *mut c_void109     fn as_ptr(&self) -> *mut c_void;
110 
111     /// Converts the raw pointer back to the original Rust object.
from_ptr(_: *mut c_void) -> Self112     unsafe fn from_ptr(_: *mut c_void) -> Self;
113 }
114 
115 impl IntoOpaque for () {
as_ptr(&self) -> *mut c_void116     fn as_ptr(&self) -> *mut c_void {
117         ptr::null_mut()
118     }
119 
from_ptr(_: *mut c_void) -> Self120     unsafe fn from_ptr(_: *mut c_void) -> Self {}
121 }
122 
123 impl IntoOpaque for usize {
as_ptr(&self) -> *mut c_void124     fn as_ptr(&self) -> *mut c_void {
125         *self as *mut usize as *mut c_void
126     }
127 
from_ptr(ptr: *mut c_void) -> Self128     unsafe fn from_ptr(ptr: *mut c_void) -> Self {
129         ptr as usize
130     }
131 }
132 
133 impl<T: Send + Sync> IntoOpaque for Box<T> {
as_ptr(&self) -> *mut c_void134     fn as_ptr(&self) -> *mut c_void {
135         self.as_ref() as *const T as *mut c_void
136     }
137 
from_ptr(ptr: *mut c_void) -> Self138     unsafe fn from_ptr(ptr: *mut c_void) -> Self {
139         Box::from_raw(ptr as *mut T)
140     }
141 }
142 
143 impl<T: Send + Sync> IntoOpaque for Arc<T> {
as_ptr(&self) -> *mut c_void144     fn as_ptr(&self) -> *mut c_void {
145         self.as_ref() as *const T as *mut c_void
146     }
147 
from_ptr(ptr: *mut c_void) -> Self148     unsafe fn from_ptr(ptr: *mut c_void) -> Self {
149         Arc::from_raw(ptr as *mut T)
150     }
151 }
152 
153 // TODO: check if the implementation returns a copy of the data and update the documentation
154 /// Converts a byte array representing a C string into a [`String`].
bytes_cstr_to_owned(bytes_cstr: &[c_char]) -> String155 pub unsafe fn bytes_cstr_to_owned(bytes_cstr: &[c_char]) -> String {
156     CStr::from_ptr(bytes_cstr.as_ptr() as *const c_char)
157         .to_string_lossy()
158         .into_owned()
159 }
160 
161 /// Converts a C string into a [`String`].
cstr_to_owned(cstr: *const c_char) -> String162 pub unsafe fn cstr_to_owned(cstr: *const c_char) -> String {
163     CStr::from_ptr(cstr as *const c_char)
164         .to_string_lossy()
165         .into_owned()
166 }
167 
168 pub(crate) struct ErrBuf {
169     buf: [c_char; ErrBuf::MAX_ERR_LEN],
170 }
171 
172 impl ErrBuf {
173     const MAX_ERR_LEN: usize = 512;
174 
new() -> ErrBuf175     pub fn new() -> ErrBuf {
176         ErrBuf {
177             buf: [0; ErrBuf::MAX_ERR_LEN],
178         }
179     }
180 
as_mut_ptr(&mut self) -> *mut c_char181     pub fn as_mut_ptr(&mut self) -> *mut c_char {
182         self.buf.as_mut_ptr()
183     }
184 
len(&self) -> usize185     pub fn len(&self) -> usize {
186         self.buf.len()
187     }
188 
to_string(&self) -> String189     pub fn to_string(&self) -> String {
190         unsafe { bytes_cstr_to_owned(&self.buf) }
191     }
192 }
193 
194 impl Default for ErrBuf {
default() -> ErrBuf195     fn default() -> ErrBuf {
196         ErrBuf::new()
197     }
198 }
199 
200 pub(crate) trait WrappedCPointer {
201     type Target;
202 
ptr(&self) -> *mut Self::Target203     fn ptr(&self) -> *mut Self::Target;
204 
is_null(&self) -> bool205     fn is_null(&self) -> bool {
206         self.ptr().is_null()
207     }
208 }
209 
210 /// Converts a container into a C array.
211 pub(crate) trait AsCArray<T: WrappedCPointer> {
as_c_array(&self) -> *mut *mut T::Target212     fn as_c_array(&self) -> *mut *mut T::Target;
213 }
214 
215 impl<T: WrappedCPointer> AsCArray<T> for Vec<T> {
as_c_array(&self) -> *mut *mut T::Target216     fn as_c_array(&self) -> *mut *mut T::Target {
217         self.as_ptr() as *mut *mut T::Target
218     }
219 }
220 
221 pub(crate) struct NativePtr<T>
222 where
223     T: KafkaDrop,
224 {
225     ptr: NonNull<T>,
226 }
227 
228 impl<T> Drop for NativePtr<T>
229 where
230     T: KafkaDrop,
231 {
drop(&mut self)232     fn drop(&mut self) {
233         trace!("Destroying {}: {:?}", T::TYPE, self.ptr);
234         unsafe { T::DROP(self.ptr.as_ptr()) }
235         trace!("Destroyed {}: {:?}", T::TYPE, self.ptr);
236     }
237 }
238 
239 pub(crate) unsafe trait KafkaDrop {
240     const TYPE: &'static str;
241     const DROP: unsafe extern "C" fn(*mut Self);
242 }
243 
244 impl<T> WrappedCPointer for NativePtr<T>
245 where
246     T: KafkaDrop,
247 {
248     type Target = T;
249 
ptr(&self) -> *mut T250     fn ptr(&self) -> *mut T {
251         self.ptr.as_ptr()
252     }
253 }
254 
255 impl<T> Deref for NativePtr<T>
256 where
257     T: KafkaDrop,
258 {
259     type Target = T;
deref(&self) -> &Self::Target260     fn deref(&self) -> &Self::Target {
261         unsafe { self.ptr.as_ref() }
262     }
263 }
264 
265 impl<T> fmt::Debug for NativePtr<T>
266 where
267     T: KafkaDrop,
268 {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result269     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
270         self.ptr.fmt(f)
271     }
272 }
273 
274 impl<T> NativePtr<T>
275 where
276     T: KafkaDrop,
277 {
from_ptr(ptr: *mut T) -> Option<Self>278     pub(crate) unsafe fn from_ptr(ptr: *mut T) -> Option<Self> {
279         NonNull::new(ptr).map(|ptr| Self { ptr })
280     }
281 
ptr(&self) -> *mut T282     pub(crate) fn ptr(&self) -> *mut T {
283         self.ptr.as_ptr()
284     }
285 }
286 
287 pub(crate) struct OnDrop<F>(pub F)
288 where
289     F: Fn();
290 
291 impl<F> Drop for OnDrop<F>
292 where
293     F: Fn(),
294 {
drop(&mut self)295     fn drop(&mut self) {
296         (self.0)()
297     }
298 }
299 
300 /// An abstraction over asynchronous runtimes.
301 ///
302 /// There are several asynchronous runtimes available for Rust. By default
303 /// rust-rdkafka uses Tokio, via the [`TokioRuntime`], but it has pluggable
304 /// support for any runtime that can satisfy this trait.
305 ///
306 /// For an example of using the [smol] runtime with rust-rdkafka, see the
307 /// [smol_runtime] example.
308 ///
309 /// [smol]: https://docs.rs/smol
310 /// [smol_runtime]: https://github.com/fede1024/rust-rdkafka/tree/master/examples/smol_runtime.rs
311 pub trait AsyncRuntime {
312     /// The type of the future returned by
313     /// [`delay_for`](AsyncRuntime::delay_for).
314     type Delay: Future<Output = ()> + Send + Unpin;
315 
316     /// Spawns an asynchronous task.
317     ///
318     /// The task should be be polled to completion, unless the runtime exits
319     /// first. With some runtimes this requires an explicit "detach" step.
spawn<T>(task: T) where T: Future<Output = ()> + Send + 'static320     fn spawn<T>(task: T)
321     where
322         T: Future<Output = ()> + Send + 'static;
323 
324     /// Constructs a future that will resolve after `duration` has elapsed.
delay_for(duration: Duration) -> Self::Delay325     fn delay_for(duration: Duration) -> Self::Delay;
326 }
327 
328 /// An [`AsyncRuntime`] implementation backed by [Tokio](tokio).
329 ///
330 /// This runtime is used by default throughout the crate, unless the `tokio`
331 /// feature is disabled.
332 #[cfg(feature = "tokio")]
333 #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
334 pub struct TokioRuntime;
335 
336 #[cfg(feature = "tokio")]
337 #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
338 impl AsyncRuntime for TokioRuntime {
339     type Delay = tokio::time::Delay;
340 
spawn<T>(task: T) where T: Future<Output = ()> + Send + 'static,341     fn spawn<T>(task: T)
342     where
343         T: Future<Output = ()> + Send + 'static,
344     {
345         tokio::spawn(task);
346     }
347 
delay_for(duration: Duration) -> Self::Delay348     fn delay_for(duration: Duration) -> Self::Delay {
349         tokio::time::delay_for(duration)
350     }
351 }
352