1 //--------------------------------------------------------------------------
2 // Copyright (C) 2014-2021 Cisco and/or its affiliates. All rights reserved.
3 // Copyright (C) 2013-2013 Sourcefire, Inc.
4 //
5 // This program is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU General Public License Version 2 as published
7 // by the Free Software Foundation.  You may not use, modify or distribute
8 // this program under any other version of the GNU General Public License.
9 //
10 // This program is distributed in the hope that it will be useful, but
11 // WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 // General Public License for more details.
14 //
15 // You should have received a copy of the GNU General Public License along
16 // with this program; if not, write to the Free Software Foundation, Inc.,
17 // 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
18 //--------------------------------------------------------------------------
19 // analyzer.cc author Michael Altizer <mialtize@cisco.com>
20 
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include "analyzer.h"
26 
27 #include <daq.h>
28 
29 #include <thread>
30 
31 #include "detection/context_switcher.h"
32 #include "detection/detect.h"
33 #include "detection/detection_engine.h"
34 #include "detection/ips_context.h"
35 #include "detection/tag.h"
36 #include "file_api/file_service.h"
37 #include "filters/detection_filter.h"
38 #include "filters/rate_filter.h"
39 #include "filters/sfrf.h"
40 #include "filters/sfthreshold.h"
41 #include "flow/flow.h"
42 #include "flow/ha.h"
43 #include "framework/data_bus.h"
44 #include "latency/packet_latency.h"
45 #include "latency/rule_latency.h"
46 #include "log/messages.h"
47 #include "main/swapper.h"
48 #include "main.h"
49 #include "managers/action_manager.h"
50 #include "managers/inspector_manager.h"
51 #include "managers/ips_manager.h"
52 #include "managers/event_manager.h"
53 #include "managers/module_manager.h"
54 #include "memory/memory_cap.h"
55 #include "packet_io/active.h"
56 #include "packet_io/sfdaq.h"
57 #include "packet_io/sfdaq_config.h"
58 #include "packet_io/sfdaq_instance.h"
59 #include "packet_io/sfdaq_module.h"
60 #include "packet_tracer/packet_tracer.h"
61 #include "profiler/profiler.h"
62 #include "pub_sub/daq_message_event.h"
63 #include "pub_sub/finalize_packet_event.h"
64 #include "side_channel/side_channel.h"
65 #include "stream/stream.h"
66 #include "target_based/host_attributes.h"
67 #include "time/packet_time.h"
68 #include "trace/trace_api.h"
69 #include "utils/stats.h"
70 
71 #include "analyzer_command.h"
72 #include "oops_handler.h"
73 #include "snort.h"
74 #include "snort_config.h"
75 #include "thread_config.h"
76 
77 using namespace snort;
78 using namespace std;
79 
80 static MainHook_f main_hook = snort_ignore;
81 
82 THREAD_LOCAL ProfileStats daqPerfStats;
83 static THREAD_LOCAL Analyzer* local_analyzer = nullptr;
84 
85 //-------------------------------------------------------------------------
86 
87 class RetryQueue
88 {
89     struct Entry
90     {
EntryRetryQueue::Entry91         Entry(const struct timeval& next_try, DAQ_Msg_h msg) : next_try(next_try), msg(msg) { }
92 
93         struct timeval next_try;
94         DAQ_Msg_h msg;
95     };
96 
97 public:
RetryQueue(unsigned interval_ms)98     RetryQueue(unsigned interval_ms)
99     {
100         assert(interval_ms > 0);
101         interval = { static_cast<time_t>(interval_ms / 1000), static_cast<suseconds_t>((interval_ms % 1000) * 1000) };
102     }
103 
~RetryQueue()104     ~RetryQueue()
105     {
106         assert(empty());
107     }
108 
put(DAQ_Msg_h msg)109     void put(DAQ_Msg_h msg)
110     {
111         struct timeval now, next_try;
112         packet_gettimeofday(&now);
113         timeradd(&now, &interval, &next_try);
114         queue.emplace_back(next_try, msg);
115     }
116 
get(const struct timeval * now=nullptr)117     DAQ_Msg_h get(const struct timeval* now = nullptr)
118     {
119         if (!empty())
120         {
121             const Entry& entry = queue.front();
122             if (!now || !timercmp(now, &entry.next_try, <))
123             {
124                 DAQ_Msg_h msg = entry.msg;
125                 queue.pop_front();
126                 return msg;
127             }
128         }
129         return nullptr;
130     }
131 
empty() const132     bool empty() const
133     {
134         return queue.empty();
135     }
136 
137 private:
138     deque<Entry> queue;
139     struct timeval interval;
140 };
141 
142 //-------------------------------------------------------------------------
143 
144 /*
145  * Static Class Methods
146  */
get_local_analyzer()147 Analyzer* Analyzer::get_local_analyzer()
148 {
149     return local_analyzer;
150 }
151 
get_switcher()152 ContextSwitcher* Analyzer::get_switcher()
153 {
154     assert(local_analyzer != nullptr);
155     return local_analyzer->switcher;
156 }
157 
set_main_hook(MainHook_f f)158 void Analyzer::set_main_hook(MainHook_f f)
159 {
160     main_hook = f;
161 }
162 
163 //-------------------------------------------------------------------------
164 // message processing
165 //-------------------------------------------------------------------------
166 
process_daq_sof_eof_msg(DAQ_Msg_h msg,DAQ_Verdict & verdict)167 static void process_daq_sof_eof_msg(DAQ_Msg_h msg, DAQ_Verdict& verdict)
168 {
169     const DAQ_FlowStats_t *stats = (const DAQ_FlowStats_t*) daq_msg_get_hdr(msg);
170     const char* key;
171 
172     if (daq_msg_get_type(msg) == DAQ_MSG_TYPE_EOF)
173     {
174         packet_time_update(&stats->eof_timestamp);
175         daq_stats.eof_messages++;
176         key = DAQ_EOF_MSG_EVENT;
177     }
178     else
179     {
180         packet_time_update(&stats->sof_timestamp);
181         daq_stats.sof_messages++;
182         key = DAQ_SOF_MSG_EVENT;
183     }
184 
185     DaqMessageEvent event(msg, verdict);
186     DataBus::publish(key, event);
187 }
188 
process_packet(Packet * p)189 static bool process_packet(Packet* p)
190 {
191     assert(p->pkth && p->pkt);
192 
193     daq_stats.rx_bytes += p->pktlen;
194 
195     PacketTracer::activate(*p);
196 
197     p->user_inspection_policy_id = get_inspection_policy()->user_policy_id;
198     p->user_ips_policy_id = get_ips_policy()->user_policy_id;
199     p->user_network_policy_id = get_network_policy()->user_policy_id;
200 
201     if ( !(p->packet_flags & PKT_IGNORE) )
202     {
203         clear_file_data();
204         // return incomplete status if the main hook indicates not all work was done
205         if (!main_hook(p))
206             return false;
207     }
208 
209     return true;
210 }
211 
212 // Finalize DAQ message verdict
distill_verdict(Packet * p)213 static DAQ_Verdict distill_verdict(Packet* p)
214 {
215     DAQ_Verdict verdict = DAQ_VERDICT_PASS;
216     Active* act = p->active;
217 
218     // First Pass
219     if ( act->session_was_blocked() ||
220             (p->flow && (p->flow->flow_state == Flow::FlowState::BLOCK)) )
221     {
222         if ( !act->can_block() )
223             verdict = DAQ_VERDICT_PASS;
224         else if ( act->get_tunnel_bypass() )
225         {
226             daq_stats.internal_blacklist++;
227             verdict = DAQ_VERDICT_BLOCK;
228         }
229         else if ( p->context->conf->inline_mode() || act->packet_force_dropped() )
230             verdict = DAQ_VERDICT_BLACKLIST;
231         else
232             verdict = DAQ_VERDICT_IGNORE;
233     }
234 
235     // Second Pass, now with more side effects
236     if ( act->packet_was_dropped() && act->can_block() )
237     {
238         if ( verdict == DAQ_VERDICT_PASS )
239             verdict = DAQ_VERDICT_BLOCK;
240     }
241     else if ( p->packet_flags & PKT_RESIZED )
242     {
243         // we never increase, only trim, but daq doesn't support resizing wire packet
244         PacketManager::encode_update(p);
245 
246         if ( p->daq_instance->inject(p->daq_msg, 0, p->pkt, p->pktlen) == DAQ_SUCCESS )
247             verdict = DAQ_VERDICT_BLOCK;
248         // FIXIT-M X Should we be blocking the wire packet even if the injection fails?
249     }
250     else if ( p->packet_flags & PKT_MODIFIED )
251     {
252         // this packet was normalized and/or has replacements
253         PacketManager::encode_update(p);
254         verdict = DAQ_VERDICT_REPLACE;
255     }
256     else if ( act->session_was_trusted() )
257         verdict = DAQ_VERDICT_WHITELIST;
258     else if ( (p->packet_flags & PKT_IGNORE) ||
259         (p->flow &&
260             (p->flow->get_ignore_direction() == SSN_DIR_BOTH ||
261                 p->flow->flow_state == Flow::FlowState::ALLOW)) )
262     {
263         verdict = DAQ_VERDICT_WHITELIST;
264     }
265     else if ( p->ptrs.decode_flags & DECODE_PKT_TRUST )
266     {
267         if ( p->flow )
268             p->flow->set_ignore_direction(SSN_DIR_BOTH);
269         verdict = DAQ_VERDICT_WHITELIST;
270     }
271     else
272         verdict = DAQ_VERDICT_PASS;
273 
274     if (DAQ_VERDICT_WHITELIST == verdict)
275     {
276         if (p->flow && p->flow->cannot_trust())
277             verdict = DAQ_VERDICT_PASS;
278         else if (act->get_tunnel_bypass())
279         {
280             verdict = DAQ_VERDICT_PASS;
281             daq_stats.internal_whitelist++;
282         }
283     }
284     return verdict;
285 }
286 
add_to_retry_queue(DAQ_Msg_h daq_msg)287 void Analyzer::add_to_retry_queue(DAQ_Msg_h daq_msg)
288 {
289     retry_queue->put(daq_msg);
290 }
291 
292 /*
293  * Private message processing methods
294  */
post_process_daq_pkt_msg(Packet * p)295 void Analyzer::post_process_daq_pkt_msg(Packet* p)
296 {
297     bool msg_was_held = false;
298 
299     Active::execute(p);
300 
301     DAQ_Verdict verdict = MAX_DAQ_VERDICT;
302 
303     if (p->active->packet_retry_requested())
304     {
305         add_to_retry_queue(p->daq_msg);
306         daq_stats.retries_queued++;
307     }
308     else
309     {
310         msg_was_held = (p->active->is_packet_held() and Stream::set_packet_action_to_hold(p));
311         if (msg_was_held)
312         {
313             if (p->flow->flags.trigger_detained_packet_event)
314             {
315                 DataBus::publish(DETAINED_PACKET_EVENT, p);
316             }
317         }
318         else
319             verdict = distill_verdict(p);
320     }
321 
322     if (PacketTracer::is_active())
323     {
324         PacketTracer::log("Policies: Network %u, Inspection %u, Detection %u\n",
325             get_network_policy()->user_policy_id, get_inspection_policy()->user_policy_id,
326             get_ips_policy()->user_policy_id);
327 
328         if (p->active->packet_retry_requested())
329             PacketTracer::log("Verdict: Queuing for Retry\n");
330         else if (msg_was_held)
331             PacketTracer::log("Verdict: Holding for Detection\n");
332         else
333             PacketTracer::log("Verdict: %s\n", SFDAQ::verdict_to_string(verdict));
334         PacketTracer::dump(p);
335     }
336 
337     if (PacketTracer::is_daq_activated())
338         PacketTracer::daq_dump(p);
339 
340     HighAvailabilityManager::process_update(p->flow, p);
341 
342     if (verdict != MAX_DAQ_VERDICT)
343     {
344         // Publish an event if something has indicated that it wants the
345         // finalize event on this flow.
346         if (p->flow and p->flow->flags.trigger_finalize_event)
347         {
348             FinalizePacketEvent event(p, verdict);
349             DataBus::publish(FINALIZE_PACKET_EVENT, event);
350         }
351 
352         if (verdict == DAQ_VERDICT_BLOCK or verdict == DAQ_VERDICT_BLACKLIST)
353             p->active->send_reason_to_daq(*p);
354 
355         oops_handler->set_current_message(nullptr);
356         p->pkth = nullptr;  // No longer avail after finalize_message.
357 
358         {
359             Profile profile(daqPerfStats);
360             p->daq_instance->finalize_message(p->daq_msg, verdict);
361         }
362     }
363 }
364 
process_daq_pkt_msg(DAQ_Msg_h msg,bool retry)365 void Analyzer::process_daq_pkt_msg(DAQ_Msg_h msg, bool retry)
366 {
367     const DAQ_PktHdr_t* pkthdr = daq_msg_get_pkthdr(msg);
368 
369     pc.analyzed_pkts++;
370 
371     if (!retry)
372         packet_time_update(&pkthdr->ts);
373 
374     DetectionEngine::wait_for_context();
375     switcher->start();
376 
377     Packet* p = switcher->get_context()->packet;
378     p->context->wire_packet = p;
379     p->context->packet_number = get_packet_number();
380     select_default_policy(pkthdr, p->context->conf);
381 
382     DetectionEngine::reset();
383     sfthreshold_reset();
384     Active::clear_queue(p);
385 
386     p->daq_msg = msg;
387     p->daq_instance = daq_instance;
388 
389     PacketManager::decode(p, pkthdr, daq_msg_get_data(msg), daq_msg_get_data_len(msg), false, retry);
390 
391     if (process_packet(p))
392     {
393         post_process_daq_pkt_msg(p);
394         switcher->stop();
395     }
396 
397     Stream::handle_timeouts(false);
398     HighAvailabilityManager::process_receive();
399 }
400 
process_daq_msg(DAQ_Msg_h msg,bool retry)401 void Analyzer::process_daq_msg(DAQ_Msg_h msg, bool retry)
402 {
403     oops_handler->set_current_message(msg);
404     memory::MemoryCap::free_space();
405 
406     DAQ_Verdict verdict = DAQ_VERDICT_PASS;
407     switch (daq_msg_get_type(msg))
408     {
409         case DAQ_MSG_TYPE_PACKET:
410             process_daq_pkt_msg(msg, retry);
411             // process_daq_pkt_msg() handles finalizing the message (or tracking it if offloaded)
412             return;
413         case DAQ_MSG_TYPE_SOF:
414         case DAQ_MSG_TYPE_EOF:
415             process_daq_sof_eof_msg(msg, verdict);
416             break;
417         default:
418             {
419                 daq_stats.other_messages++;
420                 DaqMessageEvent event(msg, verdict);
421                 DataBus::publish(DAQ_OTHER_MSG_EVENT, event);
422             }
423             break;
424     }
425     oops_handler->set_current_message(nullptr);
426     {
427         Profile profile(daqPerfStats);
428         daq_instance->finalize_message(msg, verdict);
429     }
430 }
431 
process_retry_queue()432 void Analyzer::process_retry_queue()
433 {
434     if (!retry_queue->empty())
435     {
436         struct timeval now;
437         packet_gettimeofday(&now);
438         DAQ_Msg_h msg;
439 
440         while ((msg = retry_queue->get(&now)) != nullptr)
441         {
442             process_daq_msg(msg, true);
443             daq_stats.retries_processed++;
444         }
445     }
446 }
447 
448 /*
449  * Public packet processing methods
450  */
inspect_rebuilt(Packet * p)451 bool Analyzer::inspect_rebuilt(Packet* p)
452 {
453     DetectionEngine de;
454     return main_hook(p);
455 }
456 
process_rebuilt_packet(Packet * p,const DAQ_PktHdr_t * pkthdr,const uint8_t * pkt,uint32_t pktlen)457 bool Analyzer::process_rebuilt_packet(Packet* p, const DAQ_PktHdr_t* pkthdr, const uint8_t* pkt,
458     uint32_t pktlen)
459 {
460     PacketManager::decode(p, pkthdr, pkt, pktlen, true);
461 
462     p->packet_flags |= (PKT_PSEUDO | PKT_REBUILT_FRAG);
463     p->pseudo_type = PSEUDO_PKT_IP;
464 
465     return process_packet(p);
466 }
467 
post_process_packet(Packet * p)468 void Analyzer::post_process_packet(Packet* p)
469 {
470     post_process_daq_pkt_msg(p);
471     // FIXIT-? There is an assumption that this is being called on the active context...
472     switcher->stop();
473 }
474 
finalize_daq_message(DAQ_Msg_h msg,DAQ_Verdict verdict)475 void Analyzer::finalize_daq_message(DAQ_Msg_h msg, DAQ_Verdict verdict)
476 {
477     Profile profile(daqPerfStats);
478     daq_instance->finalize_message(msg, verdict);
479 }
480 
481 //-------------------------------------------------------------------------
482 // Utility
483 //-------------------------------------------------------------------------
484 
show_source()485 void Analyzer::show_source()
486 {
487     const char* pcap = source.c_str();
488 
489     if (!strcmp(pcap, "-"))
490         pcap = "stdin";
491 
492     if (get_run_num() != 1)
493         fprintf(stdout, "%s", "\n");
494 
495     fprintf(stdout, "Reading network traffic from \"%s\" with snaplen = %u\n",
496         pcap, SnortConfig::get_conf()->daq_config->get_mru_size());
497 }
498 
set_state(State s)499 void Analyzer::set_state(State s)
500 {
501     state = s;
502     main_poke(id);
503 }
504 
get_state_string()505 const char* Analyzer::get_state_string()
506 {
507     State s = get_state();  // can't use atomic in switch with optimization
508 
509     switch ( s )
510     {
511         case State::NEW:         return "NEW";
512         case State::INITIALIZED: return "INITIALIZED";
513         case State::STARTED:     return "STARTED";
514         case State::RUNNING:     return "RUNNING";
515         case State::PAUSED:      return "PAUSED";
516         case State::STOPPED:     return "STOPPED";
517         default: assert(false);
518     }
519 
520     return "UNKNOWN";
521 }
522 
523 //-------------------------------------------------------------------------
524 // Thread life cycle
525 //-------------------------------------------------------------------------
526 
idle()527 void Analyzer::idle()
528 {
529     idling = true;
530 
531     // FIXIT-L this whole thing could be pub-sub
532     daq_stats.idle++;
533 
534     // This should only be called if the DAQ timeout elapsed, so increment the packet time
535     // by the DAQ timeout.
536     struct timeval now, increment;
537     unsigned int timeout = SnortConfig::get_conf()->daq_config->timeout;
538     packet_gettimeofday(&now);
539     increment = { static_cast<time_t>(timeout / 1000), static_cast<suseconds_t>((timeout % 1000) * 1000) };
540     timeradd(&now, &increment, &now);
541     packet_time_update(&now);
542 
543     DataBus::publish(THREAD_IDLE_EVENT, nullptr);
544 
545     // Service the retry queue with the new packet time.
546     process_retry_queue();
547 
548     Stream::handle_timeouts(true);
549 
550     HighAvailabilityManager::process_receive();
551 
552     handle_uncompleted_commands();
553 
554     idling = false;
555 }
556 
557 /*
558  * Perform all packet thread initialization actions that can be taken with dropped privileges
559  * and/or must be called after the DAQ module has been started.
560  */
init_unprivileged()561 void Analyzer::init_unprivileged()
562 {
563     // using dummy values until further integration
564     // FIXIT-M max_contexts must be <= DAQ msg pool to avoid permanent stall (offload only)
565     // condition (polling for packets that won't come to resume ready suspends)
566 #ifdef REG_TEST
567     const unsigned max_contexts = 20;
568 #else
569     const unsigned max_contexts = 255;
570 #endif
571 
572     switcher = new ContextSwitcher;
573 
574     for ( unsigned i = 0; i < max_contexts; ++i )
575         switcher->push(new IpsContext);
576 
577     const SnortConfig* sc = SnortConfig::get_conf();
578 
579     // This should be called as soon as possible
580     // to handle all trace log messages
581     TraceApi::thread_init(sc->trace_config);
582 
583     CodecManager::thread_init(sc);
584 
585     // this depends on instantiated daq capabilities
586     // so it is done here instead of init()
587     Active::thread_init(sc);
588 
589     InitTag();
590     EventTrace_Init();
591     detection_filter_init(sc->detection_filter_config);
592 
593     EventManager::open_outputs();
594     IpsManager::setup_options(sc);
595     ActionManager::thread_init(sc);
596     FileService::thread_init();
597     SideChannelManager::thread_init();
598     HighAvailabilityManager::thread_init(); // must be before InspectorManager::thread_init();
599     InspectorManager::thread_init(sc);
600     PacketTracer::thread_init();
601     HostAttributesManager::initialize();
602 
603     // in case there are HA messages waiting, process them first
604     HighAvailabilityManager::process_receive();
605     PacketManager::thread_init();
606 
607     // init filters hash tables that depend on alerts
608     sfthreshold_alloc(sc->threshold_config->memcap, sc->threshold_config->memcap);
609     SFRF_Alloc(sc->rate_filter_config->memcap);
610 }
611 
reinit(const SnortConfig * sc)612 void Analyzer::reinit(const SnortConfig* sc)
613 {
614     InspectorManager::thread_reinit(sc);
615     ActionManager::thread_reinit(sc);
616     TraceApi::thread_reinit(sc->trace_config);
617 }
618 
stop_removed(const SnortConfig * sc)619 void Analyzer::stop_removed(const SnortConfig* sc)
620 {
621     InspectorManager::thread_stop_removed(sc);
622 }
623 
term()624 void Analyzer::term()
625 {
626     const SnortConfig* sc = SnortConfig::get_conf();
627 
628     HighAvailabilityManager::thread_term_beginning();
629 
630     if ( !sc->dirty_pig )
631         Stream::purge_flows();
632 
633     DAQ_Msg_h msg;
634     while ((msg = retry_queue->get()) != nullptr)
635     {
636         daq_stats.retries_discarded++;
637         Profile profile(daqPerfStats);
638         daq_instance->finalize_message(msg, DAQ_VERDICT_BLOCK);
639     }
640 
641     DetectionEngine::idle();
642     InspectorManager::thread_stop(sc);
643     ModuleManager::accumulate();
644     InspectorManager::thread_term();
645     ActionManager::thread_term();
646 
647     IpsManager::clear_options(sc);
648     EventManager::close_outputs();
649     CodecManager::thread_term();
650     HighAvailabilityManager::thread_term();
651     SideChannelManager::thread_term();
652 
653     oops_handler->set_current_message(nullptr);
654 
655     daq_instance->stop();
656     SFDAQ::set_local_instance(nullptr);
657 
658     PacketLatency::tterm();
659     RuleLatency::tterm();
660 
661     Profiler::consolidate_stats();
662 
663     DetectionEngine::thread_term();
664     detection_filter_term();
665     EventTrace_Term();
666     CleanupTag();
667     FileService::thread_term();
668     PacketTracer::thread_term();
669     PacketManager::thread_term();
670 
671     Active::thread_term();
672     delete switcher;
673 
674     sfthreshold_free();
675     RateFilter_Cleanup();
676 
677     TraceApi::thread_term();
678 
679     ModuleManager::accumulate_module("memory");
680 }
681 
Analyzer(SFDAQInstance * instance,unsigned i,const char * s,uint64_t msg_cnt)682 Analyzer::Analyzer(SFDAQInstance* instance, unsigned i, const char* s, uint64_t msg_cnt)
683 {
684     id = i;
685     exit_after_cnt = msg_cnt;
686     source = s ? s : "";
687     daq_instance = instance;
688     oops_handler = new OopsHandler();
689     retry_queue = new RetryQueue(200);
690     set_state(State::NEW);
691 }
692 
~Analyzer()693 Analyzer::~Analyzer()
694 {
695     delete daq_instance;
696     delete oops_handler;
697     delete retry_queue;
698 }
699 
operator ()(Swapper * ps,uint16_t run_num)700 void Analyzer::operator()(Swapper* ps, uint16_t run_num)
701 {
702     oops_handler->tinit();
703 
704     set_thread_type(STHREAD_TYPE_PACKET);
705     set_instance_id(id);
706     set_run_num(run_num);
707     local_analyzer = this;
708 
709     ps->apply(*this);
710 
711     if (SnortConfig::get_conf()->pcap_show())
712         show_source();
713 
714     // init here to pin separately from packet threads
715     DetectionEngine::thread_init();
716 
717     // Perform all packet thread initialization actions that need to be taken with escalated
718     // privileges prior to starting the DAQ module.
719     SnortConfig::get_conf()->thread_config->implement_thread_affinity(
720         STHREAD_TYPE_PACKET, get_instance_id());
721 
722     SFDAQ::set_local_instance(daq_instance);
723     set_state(State::INITIALIZED);
724 
725     Profiler::start();
726 
727     // Start the main loop
728     analyze();
729 
730     Profiler::stop(pc.analyzed_pkts);
731     term();
732 
733     set_state(State::STOPPED);
734 
735     oops_handler->tterm();
736 }
737 
738 /* Note: This will be called from the main thread.  Everything it does must be
739     thread-safe in relation to interactions with the analyzer thread. */
execute(AnalyzerCommand * ac)740 void Analyzer::execute(AnalyzerCommand* ac)
741 {
742     pending_work_queue_mutex.lock();
743     pending_work_queue.push(ac);
744     pending_work_queue_mutex.unlock();
745 
746     /* Break out of the DAQ acquire loop so that the command will be processed.
747         This is explicitly safe to call from another thread. */
748     if ( state >= State::STARTED and state < State::STOPPED and daq_instance )
749         daq_instance->interrupt();
750 }
751 
handle_command()752 bool Analyzer::handle_command()
753 {
754     AnalyzerCommand* ac = nullptr;
755 
756     pending_work_queue_mutex.lock();
757     if (!pending_work_queue.empty())
758     {
759         ac = pending_work_queue.front();
760         pending_work_queue.pop();
761     }
762     pending_work_queue_mutex.unlock();
763 
764     if (!ac)
765         return false;
766 
767     void* ac_state = nullptr;
768     if ( ac->execute(*this, &ac_state) )
769         add_command_to_completed_queue(ac);
770     else
771         add_command_to_uncompleted_queue(ac, ac_state);
772 
773     return true;
774 }
775 
add_command_to_uncompleted_queue(AnalyzerCommand * aci,void * acs)776 void Analyzer::add_command_to_uncompleted_queue(AnalyzerCommand* aci, void* acs)
777 {
778     UncompletedAnalyzerCommand* cac = new UncompletedAnalyzerCommand(aci, acs);
779 
780     uncompleted_work_queue.push_back(cac);
781 }
782 
add_command_to_completed_queue(AnalyzerCommand * ac)783 void Analyzer::add_command_to_completed_queue(AnalyzerCommand* ac)
784 {
785         completed_work_queue_mutex.lock();
786         completed_work_queue.push(ac);
787         completed_work_queue_mutex.unlock();
788 }
789 
handle_commands()790 void Analyzer::handle_commands()
791 {
792     while (handle_command())
793         ;
794 }
795 
handle_uncompleted_commands()796 void Analyzer::handle_uncompleted_commands()
797 {
798     std::list<UncompletedAnalyzerCommand*>::iterator it = uncompleted_work_queue.begin();
799     while (it != uncompleted_work_queue.end() )
800     {
801         UncompletedAnalyzerCommand* cac = *it;
802 
803         if (cac->command->execute(*this, &cac->state) )
804         {
805             add_command_to_completed_queue(cac->command);
806             it = uncompleted_work_queue.erase(it);
807             delete cac;
808         }
809         else
810             ++it;
811     }
812 }
813 
process_messages()814 DAQ_RecvStatus Analyzer::process_messages()
815 {
816     // Max receive becomes the minimum of the configured batch size, the remaining exit_after
817     // count (if requested), and the remaining pause_after count (if requested).
818     unsigned max_recv = daq_instance->get_batch_size();
819     if (exit_after_cnt && exit_after_cnt < max_recv)
820         max_recv = exit_after_cnt;
821     if (pause_after_cnt && pause_after_cnt < max_recv)
822         max_recv = pause_after_cnt;
823 
824     DAQ_RecvStatus rstat;
825     {
826         Profile profile(daqPerfStats);
827         rstat = daq_instance->receive_messages(max_recv);
828     }
829 
830     // Preemptively service available onloads to potentially unblock processing the first message.
831     // This conveniently handles servicing offloads in the no messages received case as well.
832     DetectionEngine::onload();
833 
834     unsigned num_recv = 0;
835     DAQ_Msg_h msg;
836     while ((msg = daq_instance->next_message()) != nullptr)
837     {
838         // Dispose of any messages to be skipped first.
839         if (skip_cnt > 0)
840         {
841             Profile profile(daqPerfStats);
842             daq_stats.skipped++;
843             skip_cnt--;
844             daq_instance->finalize_message(msg, DAQ_VERDICT_PASS);
845             continue;
846         }
847         // FIXIT-M reimplement fail-open capability?
848         num_recv++;
849         // IMPORTANT: process_daq_msg() is responsible for finalizing the messages.
850         process_daq_msg(msg, false);
851         DetectionEngine::onload();
852         process_retry_queue();
853         handle_uncompleted_commands();
854     }
855 
856     if (exit_after_cnt && (exit_after_cnt -= num_recv) == 0)
857         stop();
858     if (pause_after_cnt && (pause_after_cnt -= num_recv) == 0)
859         pause();
860     return rstat;
861 }
862 
analyze()863 void Analyzer::analyze()
864 {
865     while (!exit_requested)
866     {
867         // If we're not in the running state (usually either pre-start or paused),
868         // just keep stalling until something else comes up.
869         if (state != State::RUNNING)
870         {
871             if (!handle_command())
872             {
873                 chrono::milliseconds ms(10);
874                 this_thread::sleep_for(ms);
875             }
876             continue;
877         }
878 
879         // Receive and process a batch of messages.  Evaluate the receive status after processing
880         // the returned messages to determine if we should immediately continue, take the opportunity
881         // to deal with some house cleaning work, or terminate the analyzer thread.
882         DAQ_RecvStatus rstat = process_messages();
883         if (rstat != DAQ_RSTAT_OK && rstat != DAQ_RSTAT_WOULD_BLOCK)
884         {
885             if (rstat == DAQ_RSTAT_TIMEOUT)
886             {
887                 // If the receive timed out, let's do some idle work before continuing.
888                 // FIXIT-L Hitting a one second timeout when attached to any real traffic source
889                 // is extremely unlikely, so relying on anything in thread_idle() ever being
890                 // called is dangerous.
891                 idle();
892             }
893             else if (rstat == DAQ_RSTAT_INTERRUPTED)
894             {
895                 // If the status reports INTERRUPTED because of an interrupt() call, exit_requested should
896                 // be set for the next pass through the main loop.  Use this as a hint to check for analyzer
897                 // commands.
898                 handle_commands();
899             }
900             else
901             {
902                 if (rstat == DAQ_RSTAT_NOBUF)
903                     ErrorMessage("Exhausted the DAQ message pool!\n");
904                 else if (rstat == DAQ_RSTAT_ERROR)
905                     ErrorMessage("Error receiving message from the DAQ instance: %s\n", daq_instance->get_error());
906                 // Implicitly handled:
907                 // DAQ_RSTAT_EOF - File readback completed, job well done; let's get out of here.
908                 // DAQ_RSTAT_INVALID - This really shouldn't happen.
909                 break;
910             }
911         }
912     }
913 }
914 
start()915 void Analyzer::start()
916 {
917     assert(state == State::INITIALIZED);
918 
919     if (!daq_instance->start())
920     {
921         ErrorMessage("Analyzer: Failed to start DAQ instance\n");
922         exit_requested = true;
923     }
924     set_state(State::STARTED);
925 }
926 
run(bool paused)927 void Analyzer::run(bool paused)
928 {
929     assert(state == State::STARTED);
930     init_unprivileged();
931     if ( paused )
932         set_state(State::PAUSED);
933     else
934         set_state(State::RUNNING);
935 }
936 
stop()937 void Analyzer::stop()
938 {
939     exit_requested = true;
940 }
941 
pause()942 void Analyzer::pause()
943 {
944     if (state == State::RUNNING)
945     {
946         set_state(State::PAUSED);
947         LogMessage("== [%u] paused\n", id);
948     }
949     else
950         ErrorMessage("Analyzer: Received PAUSE command while in state %s\n",
951             get_state_string());
952 }
953 
resume(uint64_t msg_cnt)954 void Analyzer::resume(uint64_t msg_cnt)
955 {
956     if (state == State::PAUSED)
957     {
958         set_pause_after_cnt(msg_cnt);
959         set_state(State::RUNNING);
960     }
961     else
962         ErrorMessage("Analyzer: Received RESUME command while in state %s\n",
963             get_state_string());
964 }
965 
reload_daq()966 void Analyzer::reload_daq()
967 {
968     if (daq_instance)
969         daq_instance->reload();
970 }
971 
rotate()972 void Analyzer::rotate()
973 {
974     DataBus::publish(THREAD_ROTATE_EVENT, nullptr);
975 }
976 
977