1 /* Copyright (C) 2020 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 // written by Sascha Steinbiss <sascha@steinbiss.name>
19 
20 use super::mqtt_message::*;
21 use super::parser::*;
22 use crate::applayer::{self, LoggerFlags};
23 use crate::applayer::*;
24 use crate::core::{self, AppProto, Flow, ALPROTO_FAILED, ALPROTO_UNKNOWN, IPPROTO_TCP};
25 use num_traits::FromPrimitive;
26 use nom;
27 use std;
28 use std::ffi::{CStr,CString};
29 use std::mem::transmute;
30 
31 // Used as a special pseudo packet identifier to denote the first CONNECT
32 // packet in a connection. Note that there is no risk of collision with a
33 // parsed packet identifier because in the protocol these are only 16 bit
34 // unsigned.
35 const MQTT_CONNECT_PKT_ID: u32 = std::u32::MAX;
36 // Maximum message length in bytes. If the length of a message exceeds
37 // this value, it will be truncated. Default: 1MB.
38 static mut MAX_MSG_LEN: u32 = 1048576;
39 
40 static mut ALPROTO_MQTT: AppProto = ALPROTO_UNKNOWN;
41 
42 #[derive(FromPrimitive, Debug)]
43 #[repr(u32)]
44 pub enum MQTTEvent {
45     MissingConnect = 0,
46     MissingPublish,
47     MissingSubscribe,
48     MissingUnsubscribe,
49     DoubleConnect,
50     UnintroducedMessage,
51     InvalidQosLevel,
52     MissingMsgId,
53     UnassignedMsgtype,
54 }
55 
56 #[derive(Debug)]
57 pub struct MQTTTransaction {
58     tx_id: u64,
59     pkt_id: Option<u32>,
60     pub msg: Vec<MQTTMessage>,
61     complete: bool,
62     toclient: bool,
63     toserver: bool,
64 
65     logged: LoggerFlags,
66     de_state: Option<*mut core::DetectEngineState>,
67     events: *mut core::AppLayerDecoderEvents,
68     tx_data: applayer::AppLayerTxData,
69 }
70 
71 impl MQTTTransaction {
new(msg: MQTTMessage) -> MQTTTransaction72     pub fn new(msg: MQTTMessage) -> MQTTTransaction {
73         let mut m = MQTTTransaction {
74             tx_id: 0,
75             pkt_id: None,
76             complete: false,
77             logged: LoggerFlags::new(),
78             msg: Vec::new(),
79             toclient: false,
80             toserver: false,
81             de_state: None,
82             events: std::ptr::null_mut(),
83             tx_data: applayer::AppLayerTxData::new(),
84         };
85         m.msg.push(msg);
86         return m;
87     }
88 
free(&mut self)89     pub fn free(&mut self) {
90         if self.events != std::ptr::null_mut() {
91             core::sc_app_layer_decoder_events_free_events(&mut self.events);
92         }
93         if let Some(state) = self.de_state {
94             core::sc_detect_engine_state_free(state);
95         }
96     }
97 }
98 
99 impl Drop for MQTTTransaction {
drop(&mut self)100     fn drop(&mut self) {
101         self.free();
102     }
103 }
104 
105 pub struct MQTTState {
106     tx_id: u64,
107     pub protocol_version: u8,
108     transactions: Vec<MQTTTransaction>,
109     connected: bool,
110     skip_request: usize,
111     skip_response: usize,
112     max_msg_len: usize,
113 }
114 
115 impl MQTTState {
new() -> Self116     pub fn new() -> Self {
117         Self {
118             tx_id: 0,
119             protocol_version: 0,
120             transactions: Vec::new(),
121             connected: false,
122             skip_request: 0,
123             skip_response: 0,
124             max_msg_len: unsafe { MAX_MSG_LEN as usize },
125         }
126     }
127 
free_tx(&mut self, tx_id: u64)128     fn free_tx(&mut self, tx_id: u64) {
129         let len = self.transactions.len();
130         let mut found = false;
131         let mut index = 0;
132         for i in 0..len {
133             let tx = &self.transactions[i];
134             if tx.tx_id == tx_id + 1 {
135                 found = true;
136                 index = i;
137                 break;
138             }
139         }
140         if found {
141             self.transactions.remove(index);
142         }
143     }
144 
get_tx(&mut self, tx_id: u64) -> Option<&MQTTTransaction>145     pub fn get_tx(&mut self, tx_id: u64) -> Option<&MQTTTransaction> {
146         for tx in &mut self.transactions {
147             if tx.tx_id == tx_id + 1 {
148                 return Some(tx);
149             }
150         }
151         return None;
152     }
153 
get_tx_by_pkt_id(&mut self, pkt_id: u32) -> Option<&mut MQTTTransaction>154     pub fn get_tx_by_pkt_id(&mut self, pkt_id: u32) -> Option<&mut MQTTTransaction> {
155         for tx in &mut self.transactions {
156             if !tx.complete {
157                 if let Some(mpktid) = tx.pkt_id {
158                     if mpktid == pkt_id {
159                         return Some(tx);
160                     }
161                 }
162             }
163         }
164         return None;
165     }
166 
new_tx(&mut self, msg: MQTTMessage, toclient: bool) -> MQTTTransaction167     fn new_tx(&mut self, msg: MQTTMessage, toclient: bool) -> MQTTTransaction {
168         let mut tx = MQTTTransaction::new(msg);
169         self.tx_id += 1;
170         tx.tx_id = self.tx_id;
171         if toclient {
172             tx.toclient = true;
173         } else {
174             tx.toserver = true;
175         }
176         return tx;
177     }
178 
179     // Handle a MQTT message depending on the direction and state.
180     // Note that we are trying to only have one mutable reference to msg
181     // and its components, however, since we are in a large match operation,
182     // we cannot pass around and/or store more references or move things
183     // without having to introduce lifetimes etc.
184     // This is the reason for the code duplication below. Maybe there is a
185     // more concise way to do it, but this works for now.
handle_msg(&mut self, msg: MQTTMessage, toclient: bool)186     fn handle_msg(&mut self, msg: MQTTMessage, toclient: bool) {
187         match msg.op {
188             MQTTOperation::CONNECT(ref conn) => {
189                 self.protocol_version = conn.protocol_version;
190                 if self.connected {
191                     let mut tx = self.new_tx(msg, toclient);
192                     MQTTState::set_event(&mut tx, MQTTEvent::DoubleConnect);
193                     self.transactions.push(tx);
194                 } else {
195                     let mut tx = self.new_tx(msg, toclient);
196                     tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
197                     self.transactions.push(tx);
198                 }
199             },
200             MQTTOperation::PUBLISH(ref publish) => {
201                 if !self.connected {
202                     let mut tx = self.new_tx(msg, toclient);
203                     MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
204                     self.transactions.push(tx);
205                     return;
206                 }
207                 match msg.header.qos_level {
208                     0 => {
209                         // with QOS level 0, we do not need to wait for a
210                         // response
211                         let mut tx = self.new_tx(msg, toclient);
212                         tx.complete = true;
213                         self.transactions.push(tx);
214                     },
215                     1..=2 => {
216                         if let Some(pkt_id) = publish.message_id {
217                             let mut tx = self.new_tx(msg, toclient);
218                             tx.pkt_id = Some(pkt_id as u32);
219                             self.transactions.push(tx);
220                         } else {
221                             let mut tx = self.new_tx(msg, toclient);
222                             MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId);
223                             self.transactions.push(tx);
224                         }
225                     },
226                     _ => {
227                         let mut tx = self.new_tx(msg, toclient);
228                         MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
229                         self.transactions.push(tx);
230                     }
231                 }
232             },
233             MQTTOperation::SUBSCRIBE(ref subscribe) => {
234                 if !self.connected {
235                     let mut tx = self.new_tx(msg, toclient);
236                     MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
237                     self.transactions.push(tx);
238                     return;
239                 }
240                 let pkt_id = subscribe.message_id as u32;
241                 match msg.header.qos_level {
242                     0 => {
243                         // with QOS level 0, we do not need to wait for a
244                         // response
245                         let mut tx = self.new_tx(msg, toclient);
246                         tx.complete = true;
247                         self.transactions.push(tx);
248                     },
249                     1..=2 => {
250                         let mut tx = self.new_tx(msg, toclient);
251                         tx.pkt_id = Some(pkt_id);
252                         self.transactions.push(tx);
253                     },
254                     _ => {
255                         let mut tx = self.new_tx(msg, toclient);
256                         MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
257                         self.transactions.push(tx);
258                     }
259                 }
260             },
261             MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => {
262                 if !self.connected {
263                     let mut tx = self.new_tx(msg, toclient);
264                     MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
265                     self.transactions.push(tx);
266                     return;
267                 }
268                 let pkt_id = unsubscribe.message_id as u32;
269                 match msg.header.qos_level {
270                     0 => {
271                         // with QOS level 0, we do not need to wait for a
272                         // response
273                         let mut tx = self.new_tx(msg, toclient);
274                         tx.complete = true;
275                         self.transactions.push(tx);
276                     },
277                     1..=2 => {
278                         let mut tx = self.new_tx(msg, toclient);
279                         tx.pkt_id = Some(pkt_id);
280                         self.transactions.push(tx);
281                     },
282                     _ => {
283                         let mut tx = self.new_tx(msg, toclient);
284                         MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
285                         self.transactions.push(tx);
286                     }
287                 }
288             },
289             MQTTOperation::CONNACK(ref _connack) => {
290                 if let Some(tx) = self.get_tx_by_pkt_id(MQTT_CONNECT_PKT_ID) {
291                     (*tx).msg.push(msg);
292                     (*tx).complete = true;
293                     (*tx).pkt_id = None;
294                     self.connected = true;
295                 } else {
296                     let mut tx = self.new_tx(msg, toclient);
297                     MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect);
298                     self.transactions.push(tx);
299                 }
300             },
301             MQTTOperation::PUBREC(ref v)
302             | MQTTOperation::PUBREL(ref v) => {
303                 if !self.connected {
304                     let mut tx = self.new_tx(msg, toclient);
305                     MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
306                     self.transactions.push(tx);
307                     return;
308                 }
309                 if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
310                     (*tx).msg.push(msg);
311                 } else {
312                     let mut tx = self.new_tx(msg, toclient);
313                     MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
314                     self.transactions.push(tx);
315                 }
316             },
317             MQTTOperation::PUBACK(ref v)
318             | MQTTOperation::PUBCOMP(ref v) => {
319                 if !self.connected {
320                     let mut tx = self.new_tx(msg, toclient);
321                     MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
322                     self.transactions.push(tx);
323                     return;
324                 }
325                 if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
326                     (*tx).msg.push(msg);
327                     (*tx).complete = true;
328                     (*tx).pkt_id = None;
329                 } else {
330                     let mut tx = self.new_tx(msg, toclient);
331                     MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
332                     self.transactions.push(tx);
333                 }
334             },
335             MQTTOperation::SUBACK(ref suback) => {
336                 if !self.connected {
337                     let mut tx = self.new_tx(msg, toclient);
338                     MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
339                     self.transactions.push(tx);
340                     return;
341                 }
342                 if let Some(tx) = self.get_tx_by_pkt_id(suback.message_id as u32) {
343                     (*tx).msg.push(msg);
344                     (*tx).complete = true;
345                     (*tx).pkt_id = None;
346                 } else {
347                     let mut tx = self.new_tx(msg, toclient);
348                     MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe);
349                     self.transactions.push(tx);
350                 }
351             },
352             MQTTOperation::UNSUBACK(ref unsuback) => {
353                 if !self.connected {
354                     let mut tx = self.new_tx(msg, toclient);
355                     MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
356                     self.transactions.push(tx);
357                     return;
358                 }
359                 if let Some(tx) = self.get_tx_by_pkt_id(unsuback.message_id as u32) {
360                     (*tx).msg.push(msg);
361                     (*tx).complete = true;
362                     (*tx).pkt_id = None;
363                 } else {
364                     let mut tx = self.new_tx(msg, toclient);
365                     MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe);
366                     self.transactions.push(tx);
367                 }
368             },
369             MQTTOperation::UNASSIGNED => {
370                 let mut tx = self.new_tx(msg, toclient);
371                 tx.complete = true;
372                 MQTTState::set_event(&mut tx, MQTTEvent::UnassignedMsgtype);
373                 self.transactions.push(tx);
374             },
375             MQTTOperation::TRUNCATED(_) => {
376                 let mut tx = self.new_tx(msg, toclient);
377                 tx.complete = true;
378                 self.transactions.push(tx);
379             },
380             MQTTOperation::AUTH(_)
381             | MQTTOperation::DISCONNECT(_) => {
382                 if !self.connected {
383                     let mut tx = self.new_tx(msg, toclient);
384                     MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
385                     self.transactions.push(tx);
386                     return;
387                 }
388                 let mut tx = self.new_tx(msg, toclient);
389                 tx.complete = true;
390                 self.transactions.push(tx);
391             },
392             MQTTOperation::PINGREQ
393             | MQTTOperation::PINGRESP => {
394                 if !self.connected {
395                     let mut tx = self.new_tx(msg, toclient);
396                     MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
397                     self.transactions.push(tx);
398                     return;
399                 }
400                 let mut tx = self.new_tx(msg, toclient);
401                 tx.complete = true;
402                 self.transactions.push(tx);
403             }
404         }
405     }
406 
parse_request(&mut self, input: &[u8]) -> AppLayerResult407     fn parse_request(&mut self, input: &[u8]) -> AppLayerResult {
408         let mut current = input;
409         if input.len() == 0 {
410             return AppLayerResult::ok();
411         }
412 
413         let mut consumed = 0;
414         SCLogDebug!("skip_request {} input len {}", self.skip_request, input.len());
415         if self.skip_request > 0 {
416             if input.len() <= self.skip_request {
417                 SCLogDebug!("reducing skip_request by {}", input.len());
418                 self.skip_request -= input.len();
419                 return AppLayerResult::ok();
420             } else {
421                 current = &input[self.skip_request..];
422                 SCLogDebug!("skip end reached, skipping {} :{:?}", self.skip_request, current);
423                 consumed = self.skip_request;
424                 self.skip_request = 0;
425             }
426         }
427 
428 
429         while current.len() > 0 {
430             let mut skipped = false;
431             SCLogDebug!("request: handling {}", current.len());
432             match parse_message(current, self.protocol_version, self.max_msg_len) {
433                 Ok((rem, msg)) => {
434                     SCLogDebug!("request msg {:?}", msg);
435                     if let MQTTOperation::TRUNCATED(ref trunc) = msg.op {
436                         SCLogDebug!("found truncated with skipped {} current len {}", trunc.skipped_length, current.len());
437                         if trunc.skipped_length >= current.len() {
438                             skipped = true;
439                             self.skip_request = trunc.skipped_length - current.len();
440                         } else {
441                             current = &current[trunc.skipped_length..];
442                             self.skip_request = 0;
443                         }
444                     }
445                     self.handle_msg(msg, false);
446                     if skipped {
447                         return AppLayerResult::ok();
448                     }
449                     consumed += current.len() - rem.len();
450                     current = rem;
451                 }
452                 Err(nom::Err::Incomplete(_)) => {
453                         SCLogDebug!("incomplete request: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len());
454                         return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32);
455                 }
456                 Err(_) => {
457                     return AppLayerResult::err();
458                 }
459             }
460         }
461 
462         return AppLayerResult::ok();
463     }
464 
parse_response(&mut self, input: &[u8]) -> AppLayerResult465     fn parse_response(&mut self, input: &[u8]) -> AppLayerResult {
466         let mut current = input;
467         if input.len() == 0 {
468             return AppLayerResult::ok();
469         }
470 
471         let mut consumed = 0;
472         SCLogDebug!("skip_response {} input len {}", self.skip_response, current.len());
473         if self.skip_response > 0 {
474             if input.len() <= self.skip_response {
475                 self.skip_response -= current.len();
476                 return AppLayerResult::ok();
477             } else {
478                 current = &input[self.skip_response..];
479                 SCLogDebug!("skip end reached, skipping {} :{:?}", self.skip_request, current);
480                 consumed = self.skip_response;
481                 self.skip_response = 0;
482             }
483         }
484 
485         while current.len() > 0 {
486             let mut skipped = false;
487             SCLogDebug!("response: handling {}", current.len());
488             match parse_message(current, self.protocol_version, self.max_msg_len as usize) {
489                 Ok((rem, msg)) => {
490                     SCLogDebug!("response msg {:?}", msg);
491                     if let MQTTOperation::TRUNCATED(ref trunc) = msg.op {
492                         SCLogDebug!("found truncated with skipped {} current len {}", trunc.skipped_length, current.len());
493                         if trunc.skipped_length >= current.len() {
494                             skipped = true;
495                             self.skip_response = trunc.skipped_length - current.len();
496                         } else {
497                             current = &current[trunc.skipped_length..];
498                             self.skip_response = 0;
499                         }
500                         SCLogDebug!("skip_response now {}", self.skip_response);
501                     }
502                     self.handle_msg(msg, true);
503                     if skipped {
504                         return AppLayerResult::ok();
505                     }
506                     consumed += current.len() - rem.len();
507                     current = rem;
508                 }
509                 Err(nom::Err::Incomplete(_)) => {
510                     SCLogDebug!("incomplete response: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len());
511                     return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32);
512                 }
513                 Err(_) => {
514                     return AppLayerResult::err();
515                 }
516             }
517         }
518 
519         return AppLayerResult::ok();
520     }
521 
set_event(tx: &mut MQTTTransaction, event: MQTTEvent)522     fn set_event(tx: &mut MQTTTransaction, event: MQTTEvent) {
523         let ev = event as u8;
524         core::sc_app_layer_decoder_events_set_event_raw(&mut tx.events, ev);
525     }
526 
tx_iterator( &mut self, min_tx_id: u64, state: &mut u64, ) -> Option<(&MQTTTransaction, u64, bool)>527     fn tx_iterator(
528         &mut self,
529         min_tx_id: u64,
530         state: &mut u64,
531     ) -> Option<(&MQTTTransaction, u64, bool)> {
532         let mut index = *state as usize;
533         let len = self.transactions.len();
534 
535         while index < len {
536             let tx = &self.transactions[index];
537             if tx.tx_id < min_tx_id + 1 {
538                 index += 1;
539                 continue;
540             }
541             *state = index as u64;
542             return Some((tx, tx.tx_id - 1, (len - index) > 1));
543         }
544 
545         return None;
546     }
547 }
548 
549 // C exports.
550 
551 export_tx_get_detect_state!(rs_mqtt_tx_get_detect_state, MQTTTransaction);
552 export_tx_set_detect_state!(rs_mqtt_tx_set_detect_state, MQTTTransaction);
553 
554 #[no_mangle]
rs_mqtt_probing_parser( _flow: *const Flow, _direction: u8, input: *const u8, input_len: u32, _rdir: *mut u8, ) -> AppProto555 pub extern "C" fn rs_mqtt_probing_parser(
556     _flow: *const Flow,
557     _direction: u8,
558     input: *const u8,
559     input_len: u32,
560     _rdir: *mut u8,
561 ) -> AppProto {
562     let buf = build_slice!(input, input_len as usize);
563     match parse_fixed_header(buf) {
564         Ok((_, hdr)) => {
565             // reject unassigned message type
566             if hdr.message_type == MQTTTypeCode::UNASSIGNED {
567                 return unsafe { ALPROTO_FAILED } ;
568             }
569             // with 2 being the highest valid QoS level
570             if hdr.qos_level > 2 {
571                 return unsafe { ALPROTO_FAILED };
572             }
573             return unsafe { ALPROTO_MQTT };
574         },
575         Err(nom::Err::Incomplete(_)) => ALPROTO_UNKNOWN,
576         Err(_) => unsafe { ALPROTO_FAILED }
577     }
578 }
579 
580 #[no_mangle]
rs_mqtt_state_new(_orig_state: *mut std::os::raw::c_void, _orig_proto: AppProto) -> *mut std::os::raw::c_void581 pub extern "C" fn rs_mqtt_state_new(_orig_state: *mut std::os::raw::c_void, _orig_proto: AppProto) -> *mut std::os::raw::c_void {
582     let state = MQTTState::new();
583     let boxed = Box::new(state);
584     return unsafe { transmute(boxed) };
585 }
586 
587 #[no_mangle]
rs_mqtt_state_free(state: *mut std::os::raw::c_void)588 pub extern "C" fn rs_mqtt_state_free(state: *mut std::os::raw::c_void) {
589     let _drop: Box<MQTTState> = unsafe { transmute(state) };
590 }
591 
592 #[no_mangle]
rs_mqtt_state_tx_free(state: *mut std::os::raw::c_void, tx_id: u64)593 pub extern "C" fn rs_mqtt_state_tx_free(state: *mut std::os::raw::c_void, tx_id: u64) {
594     let state = cast_pointer!(state, MQTTState);
595     state.free_tx(tx_id);
596 }
597 
598 #[no_mangle]
rs_mqtt_parse_request( _flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void, input: *const u8, input_len: u32, _data: *const std::os::raw::c_void, _flags: u8, ) -> AppLayerResult599 pub extern "C" fn rs_mqtt_parse_request(
600     _flow: *const Flow,
601     state: *mut std::os::raw::c_void,
602     _pstate: *mut std::os::raw::c_void,
603     input: *const u8,
604     input_len: u32,
605     _data: *const std::os::raw::c_void,
606     _flags: u8,
607 ) -> AppLayerResult {
608     let state = cast_pointer!(state, MQTTState);
609     let buf = build_slice!(input, input_len as usize);
610     return state.parse_request(buf).into();
611 }
612 
613 #[no_mangle]
rs_mqtt_parse_response( _flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void, input: *const u8, input_len: u32, _data: *const std::os::raw::c_void, _flags: u8, ) -> AppLayerResult614 pub extern "C" fn rs_mqtt_parse_response(
615     _flow: *const Flow,
616     state: *mut std::os::raw::c_void,
617     _pstate: *mut std::os::raw::c_void,
618     input: *const u8,
619     input_len: u32,
620     _data: *const std::os::raw::c_void,
621     _flags: u8,
622 ) -> AppLayerResult {
623     let state = cast_pointer!(state, MQTTState);
624     let buf = build_slice!(input, input_len as usize);
625     return state.parse_response(buf).into();
626 }
627 
628 #[no_mangle]
rs_mqtt_state_get_tx( state: *mut std::os::raw::c_void, tx_id: u64, ) -> *mut std::os::raw::c_void629 pub extern "C" fn rs_mqtt_state_get_tx(
630     state: *mut std::os::raw::c_void,
631     tx_id: u64,
632 ) -> *mut std::os::raw::c_void {
633     let state = cast_pointer!(state, MQTTState);
634     match state.get_tx(tx_id) {
635         Some(tx) => {
636             return unsafe { transmute(tx) };
637         }
638         None => {
639             return std::ptr::null_mut();
640         }
641     }
642 }
643 
644 #[no_mangle]
rs_mqtt_state_get_tx_count(state: *mut std::os::raw::c_void) -> u64645 pub extern "C" fn rs_mqtt_state_get_tx_count(state: *mut std::os::raw::c_void) -> u64 {
646     let state = cast_pointer!(state, MQTTState);
647     return state.tx_id;
648 }
649 
650 #[no_mangle]
rs_mqtt_state_progress_completion_status(_direction: u8) -> std::os::raw::c_int651 pub extern "C" fn rs_mqtt_state_progress_completion_status(_direction: u8) -> std::os::raw::c_int {
652     return 1;
653 }
654 
655 #[no_mangle]
rs_mqtt_tx_is_toclient(tx: *const std::os::raw::c_void) -> std::os::raw::c_int656 pub extern "C" fn rs_mqtt_tx_is_toclient(tx: *const std::os::raw::c_void) -> std::os::raw::c_int {
657     let tx = cast_pointer!(tx, MQTTTransaction);
658     if tx.toclient {
659         return 1;
660     }
661     return 0;
662 }
663 
664 #[no_mangle]
rs_mqtt_tx_get_alstate_progress( tx: *mut std::os::raw::c_void, direction: u8, ) -> std::os::raw::c_int665 pub extern "C" fn rs_mqtt_tx_get_alstate_progress(
666     tx: *mut std::os::raw::c_void,
667     direction: u8,
668 ) -> std::os::raw::c_int {
669     let tx = cast_pointer!(tx, MQTTTransaction);
670     if tx.complete {
671         if direction == core::STREAM_TOSERVER {
672             if tx.toserver {
673                 return 1;
674             }
675         } else if direction == core::STREAM_TOCLIENT {
676             if tx.toclient {
677                 return 1;
678             }
679         }
680     }
681     return 0;
682 }
683 
684 #[no_mangle]
rs_mqtt_tx_get_logged( _state: *mut std::os::raw::c_void, tx: *mut std::os::raw::c_void, ) -> u32685 pub extern "C" fn rs_mqtt_tx_get_logged(
686     _state: *mut std::os::raw::c_void,
687     tx: *mut std::os::raw::c_void,
688 ) -> u32 {
689     let tx = cast_pointer!(tx, MQTTTransaction);
690     return tx.logged.get();
691 }
692 
693 #[no_mangle]
rs_mqtt_tx_set_logged( _state: *mut std::os::raw::c_void, tx: *mut std::os::raw::c_void, logged: u32, )694 pub extern "C" fn rs_mqtt_tx_set_logged(
695     _state: *mut std::os::raw::c_void,
696     tx: *mut std::os::raw::c_void,
697     logged: u32,
698 ) {
699     let tx = cast_pointer!(tx, MQTTTransaction);
700     tx.logged.set(logged);
701 }
702 
703 #[no_mangle]
rs_mqtt_state_get_events( tx: *mut std::os::raw::c_void, ) -> *mut core::AppLayerDecoderEvents704 pub extern "C" fn rs_mqtt_state_get_events(
705     tx: *mut std::os::raw::c_void,
706 ) -> *mut core::AppLayerDecoderEvents {
707     let tx = cast_pointer!(tx, MQTTTransaction);
708     return tx.events;
709 }
710 
711 #[no_mangle]
rs_mqtt_state_get_event_info_by_id(event_id: std::os::raw::c_int, event_name: *mut *const std::os::raw::c_char, event_type: *mut core::AppLayerEventType) -> i8712 pub extern "C" fn rs_mqtt_state_get_event_info_by_id(event_id: std::os::raw::c_int,
713                                                       event_name: *mut *const std::os::raw::c_char,
714                                                       event_type: *mut core::AppLayerEventType)
715                                                       -> i8
716 {
717     if let Some(e) = FromPrimitive::from_i32(event_id as i32) {
718         let estr = match e {
719             MQTTEvent::MissingConnect      => { "missing_connect\0" },
720             MQTTEvent::MissingPublish      => { "missing_publish\0" },
721             MQTTEvent::MissingSubscribe    => { "missing_subscribe\0" },
722             MQTTEvent::MissingUnsubscribe  => { "missing_unsubscribe\0" },
723             MQTTEvent::DoubleConnect       => { "double_connect\0" },
724             MQTTEvent::UnintroducedMessage => { "unintroduced_message\0" },
725             MQTTEvent::InvalidQosLevel     => { "invalid_qos_level\0" },
726             MQTTEvent::MissingMsgId        => { "missing_msg_id\0" },
727             MQTTEvent::UnassignedMsgtype   => { "unassigned_msg_type\0" },
728         };
729         unsafe{
730             *event_name = estr.as_ptr() as *const std::os::raw::c_char;
731             *event_type = core::APP_LAYER_EVENT_TYPE_TRANSACTION;
732         };
733         0
734     } else {
735         -1
736     }
737 }
738 
739 #[no_mangle]
rs_mqtt_state_get_event_info(event_name: *const std::os::raw::c_char, event_id: *mut std::os::raw::c_int, event_type: *mut core::AppLayerEventType) -> std::os::raw::c_int740 pub extern "C" fn rs_mqtt_state_get_event_info(event_name: *const std::os::raw::c_char,
741                                               event_id: *mut std::os::raw::c_int,
742                                               event_type: *mut core::AppLayerEventType)
743                                               -> std::os::raw::c_int
744 {
745     if event_name == std::ptr::null() { return -1; }
746     let c_event_name: &CStr = unsafe { CStr::from_ptr(event_name) };
747     let event = match c_event_name.to_str() {
748         Ok(s) => {
749             match s {
750                 "missing_connect"      => MQTTEvent::MissingConnect as i32,
751                 "missing_publish"      => MQTTEvent::MissingPublish as i32,
752                 "missing_subscribe"    => MQTTEvent::MissingSubscribe as i32,
753                 "missing_unsubscribe"  => MQTTEvent::MissingUnsubscribe as i32,
754                 "double_connect"       => MQTTEvent::DoubleConnect as i32,
755                 "unintroduced_message" => MQTTEvent::UnintroducedMessage as i32,
756                 "invalid_qos_level"    => MQTTEvent::InvalidQosLevel as i32,
757                 "missing_msg_id"       => MQTTEvent::MissingMsgId as i32,
758                 "unassigned_msg_type"  => MQTTEvent::UnassignedMsgtype as i32,
759                 _                      => -1, // unknown event
760             }
761         },
762         Err(_) => -1, // UTF-8 conversion failed
763     };
764     unsafe{
765         *event_type = core::APP_LAYER_EVENT_TYPE_TRANSACTION;
766         *event_id = event as std::os::raw::c_int;
767     };
768     0
769 }
770 
771 #[no_mangle]
rs_mqtt_state_get_tx_iterator( _ipproto: u8, _alproto: AppProto, state: *mut std::os::raw::c_void, min_tx_id: u64, _max_tx_id: u64, istate: &mut u64, ) -> applayer::AppLayerGetTxIterTuple772 pub extern "C" fn rs_mqtt_state_get_tx_iterator(
773     _ipproto: u8,
774     _alproto: AppProto,
775     state: *mut std::os::raw::c_void,
776     min_tx_id: u64,
777     _max_tx_id: u64,
778     istate: &mut u64,
779 ) -> applayer::AppLayerGetTxIterTuple {
780     let state = cast_pointer!(state, MQTTState);
781     match state.tx_iterator(min_tx_id, istate) {
782         Some((tx, out_tx_id, has_next)) => {
783             let c_tx = unsafe { transmute(tx) };
784             let ires = applayer::AppLayerGetTxIterTuple::with_values(c_tx, out_tx_id, has_next);
785             return ires;
786         }
787         None => {
788             return applayer::AppLayerGetTxIterTuple::not_found();
789         }
790     }
791 }
792 
793 // Parser name as a C style string.
794 const PARSER_NAME: &'static [u8] = b"mqtt\0";
795 
796 export_tx_data_get!(rs_mqtt_get_tx_data, MQTTTransaction);
797 
798 #[no_mangle]
rs_mqtt_register_parser(cfg_max_msg_len: u32)799 pub unsafe extern "C" fn rs_mqtt_register_parser(cfg_max_msg_len: u32) {
800     let default_port = CString::new("[1883]").unwrap();
801     let max_msg_len = &mut MAX_MSG_LEN;
802     *max_msg_len = cfg_max_msg_len;
803     let parser = RustParser {
804         name: PARSER_NAME.as_ptr() as *const std::os::raw::c_char,
805         default_port: default_port.as_ptr(),
806         ipproto: IPPROTO_TCP,
807         probe_ts: Some(rs_mqtt_probing_parser),
808         probe_tc: Some(rs_mqtt_probing_parser),
809         min_depth: 0,
810         max_depth: 16,
811         state_new: rs_mqtt_state_new,
812         state_free: rs_mqtt_state_free,
813         tx_free: rs_mqtt_state_tx_free,
814         parse_ts: rs_mqtt_parse_request,
815         parse_tc: rs_mqtt_parse_response,
816         get_tx_count: rs_mqtt_state_get_tx_count,
817         get_tx: rs_mqtt_state_get_tx,
818         tx_get_comp_st: rs_mqtt_state_progress_completion_status,
819         tx_get_progress: rs_mqtt_tx_get_alstate_progress,
820         get_de_state: rs_mqtt_tx_get_detect_state,
821         set_de_state: rs_mqtt_tx_set_detect_state,
822         get_events: Some(rs_mqtt_state_get_events),
823         get_eventinfo: Some(rs_mqtt_state_get_event_info),
824         get_eventinfo_byid: Some(rs_mqtt_state_get_event_info_by_id),
825         localstorage_new: None,
826         localstorage_free: None,
827         get_files: None,
828         get_tx_iterator: Some(rs_mqtt_state_get_tx_iterator),
829         get_tx_data: rs_mqtt_get_tx_data,
830         apply_tx_config: None,
831         flags: APP_LAYER_PARSER_OPT_UNIDIR_TXS,
832         truncate: None,
833     };
834 
835     let ip_proto_str = CString::new("tcp").unwrap();
836 
837     if AppLayerProtoDetectConfProtoDetectionEnabled(ip_proto_str.as_ptr(), parser.name) != 0 {
838         let alproto = AppLayerRegisterProtocolDetection(&parser, 1);
839         ALPROTO_MQTT = alproto;
840         if AppLayerParserConfParserEnabled(ip_proto_str.as_ptr(), parser.name) != 0 {
841             let _ = AppLayerRegisterParser(&parser, alproto);
842         }
843     } else {
844         SCLogDebug!("Protocol detector and parser disabled for MQTT.");
845     }
846 }
847