1 //--------------------------------------------------------------------------
2 // Copyright (C) 2014-2021 Cisco and/or its affiliates. All rights reserved.
3 // Copyright (C) 2013-2013 Sourcefire, Inc.
4 //
5 // This program is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU General Public License Version 2 as published
7 // by the Free Software Foundation. You may not use, modify or distribute
8 // this program under any other version of the GNU General Public License.
9 //
10 // This program is distributed in the hope that it will be useful, but
11 // WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 // General Public License for more details.
14 //
15 // You should have received a copy of the GNU General Public License along
16 // with this program; if not, write to the Free Software Foundation, Inc.,
17 // 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 //--------------------------------------------------------------------------
19 // analyzer.cc author Michael Altizer <mialtize@cisco.com>
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24
25 #include "analyzer.h"
26
27 #include <daq.h>
28
29 #include <thread>
30
31 #include "detection/context_switcher.h"
32 #include "detection/detect.h"
33 #include "detection/detection_engine.h"
34 #include "detection/ips_context.h"
35 #include "detection/tag.h"
36 #include "file_api/file_service.h"
37 #include "filters/detection_filter.h"
38 #include "filters/rate_filter.h"
39 #include "filters/sfrf.h"
40 #include "filters/sfthreshold.h"
41 #include "flow/flow.h"
42 #include "flow/ha.h"
43 #include "framework/data_bus.h"
44 #include "latency/packet_latency.h"
45 #include "latency/rule_latency.h"
46 #include "log/messages.h"
47 #include "main/swapper.h"
48 #include "main.h"
49 #include "managers/action_manager.h"
50 #include "managers/inspector_manager.h"
51 #include "managers/ips_manager.h"
52 #include "managers/event_manager.h"
53 #include "managers/module_manager.h"
54 #include "memory/memory_cap.h"
55 #include "packet_io/active.h"
56 #include "packet_io/sfdaq.h"
57 #include "packet_io/sfdaq_config.h"
58 #include "packet_io/sfdaq_instance.h"
59 #include "packet_io/sfdaq_module.h"
60 #include "packet_tracer/packet_tracer.h"
61 #include "profiler/profiler.h"
62 #include "pub_sub/daq_message_event.h"
63 #include "pub_sub/finalize_packet_event.h"
64 #include "side_channel/side_channel.h"
65 #include "stream/stream.h"
66 #include "target_based/host_attributes.h"
67 #include "time/packet_time.h"
68 #include "trace/trace_api.h"
69 #include "utils/stats.h"
70
71 #include "analyzer_command.h"
72 #include "oops_handler.h"
73 #include "snort.h"
74 #include "snort_config.h"
75 #include "thread_config.h"
76
77 using namespace snort;
78 using namespace std;
79
80 static MainHook_f main_hook = snort_ignore;
81
82 THREAD_LOCAL ProfileStats daqPerfStats;
83 static THREAD_LOCAL Analyzer* local_analyzer = nullptr;
84
85 //-------------------------------------------------------------------------
86
87 class RetryQueue
88 {
89 struct Entry
90 {
EntryRetryQueue::Entry91 Entry(const struct timeval& next_try, DAQ_Msg_h msg) : next_try(next_try), msg(msg) { }
92
93 struct timeval next_try;
94 DAQ_Msg_h msg;
95 };
96
97 public:
RetryQueue(unsigned interval_ms)98 RetryQueue(unsigned interval_ms)
99 {
100 assert(interval_ms > 0);
101 interval = { static_cast<time_t>(interval_ms / 1000), static_cast<suseconds_t>((interval_ms % 1000) * 1000) };
102 }
103
~RetryQueue()104 ~RetryQueue()
105 {
106 assert(empty());
107 }
108
put(DAQ_Msg_h msg)109 void put(DAQ_Msg_h msg)
110 {
111 struct timeval now, next_try;
112 packet_gettimeofday(&now);
113 timeradd(&now, &interval, &next_try);
114 queue.emplace_back(next_try, msg);
115 }
116
get(const struct timeval * now=nullptr)117 DAQ_Msg_h get(const struct timeval* now = nullptr)
118 {
119 if (!empty())
120 {
121 const Entry& entry = queue.front();
122 if (!now || !timercmp(now, &entry.next_try, <))
123 {
124 DAQ_Msg_h msg = entry.msg;
125 queue.pop_front();
126 return msg;
127 }
128 }
129 return nullptr;
130 }
131
empty() const132 bool empty() const
133 {
134 return queue.empty();
135 }
136
137 private:
138 deque<Entry> queue;
139 struct timeval interval;
140 };
141
142 //-------------------------------------------------------------------------
143
144 /*
145 * Static Class Methods
146 */
get_local_analyzer()147 Analyzer* Analyzer::get_local_analyzer()
148 {
149 return local_analyzer;
150 }
151
get_switcher()152 ContextSwitcher* Analyzer::get_switcher()
153 {
154 assert(local_analyzer != nullptr);
155 return local_analyzer->switcher;
156 }
157
set_main_hook(MainHook_f f)158 void Analyzer::set_main_hook(MainHook_f f)
159 {
160 main_hook = f;
161 }
162
163 //-------------------------------------------------------------------------
164 // message processing
165 //-------------------------------------------------------------------------
166
process_daq_sof_eof_msg(DAQ_Msg_h msg,DAQ_Verdict & verdict)167 static void process_daq_sof_eof_msg(DAQ_Msg_h msg, DAQ_Verdict& verdict)
168 {
169 const DAQ_FlowStats_t *stats = (const DAQ_FlowStats_t*) daq_msg_get_hdr(msg);
170 const char* key;
171
172 if (daq_msg_get_type(msg) == DAQ_MSG_TYPE_EOF)
173 {
174 packet_time_update(&stats->eof_timestamp);
175 daq_stats.eof_messages++;
176 key = DAQ_EOF_MSG_EVENT;
177 }
178 else
179 {
180 packet_time_update(&stats->sof_timestamp);
181 daq_stats.sof_messages++;
182 key = DAQ_SOF_MSG_EVENT;
183 }
184
185 DaqMessageEvent event(msg, verdict);
186 DataBus::publish(key, event);
187 }
188
process_packet(Packet * p)189 static bool process_packet(Packet* p)
190 {
191 assert(p->pkth && p->pkt);
192
193 daq_stats.rx_bytes += p->pktlen;
194
195 PacketTracer::activate(*p);
196
197 p->user_inspection_policy_id = get_inspection_policy()->user_policy_id;
198 p->user_ips_policy_id = get_ips_policy()->user_policy_id;
199 p->user_network_policy_id = get_network_policy()->user_policy_id;
200
201 if ( !(p->packet_flags & PKT_IGNORE) )
202 {
203 clear_file_data();
204 // return incomplete status if the main hook indicates not all work was done
205 if (!main_hook(p))
206 return false;
207 }
208
209 return true;
210 }
211
212 // Finalize DAQ message verdict
distill_verdict(Packet * p)213 static DAQ_Verdict distill_verdict(Packet* p)
214 {
215 DAQ_Verdict verdict = DAQ_VERDICT_PASS;
216 Active* act = p->active;
217
218 // First Pass
219 if ( act->session_was_blocked() ||
220 (p->flow && (p->flow->flow_state == Flow::FlowState::BLOCK)) )
221 {
222 if ( !act->can_block() )
223 verdict = DAQ_VERDICT_PASS;
224 else if ( act->get_tunnel_bypass() )
225 {
226 daq_stats.internal_blacklist++;
227 verdict = DAQ_VERDICT_BLOCK;
228 }
229 else if ( p->context->conf->inline_mode() || act->packet_force_dropped() )
230 verdict = DAQ_VERDICT_BLACKLIST;
231 else
232 verdict = DAQ_VERDICT_IGNORE;
233 }
234
235 // Second Pass, now with more side effects
236 if ( act->packet_was_dropped() && act->can_block() )
237 {
238 if ( verdict == DAQ_VERDICT_PASS )
239 verdict = DAQ_VERDICT_BLOCK;
240 }
241 else if ( p->packet_flags & PKT_RESIZED )
242 {
243 // we never increase, only trim, but daq doesn't support resizing wire packet
244 PacketManager::encode_update(p);
245
246 if ( p->daq_instance->inject(p->daq_msg, 0, p->pkt, p->pktlen) == DAQ_SUCCESS )
247 verdict = DAQ_VERDICT_BLOCK;
248 // FIXIT-M X Should we be blocking the wire packet even if the injection fails?
249 }
250 else if ( p->packet_flags & PKT_MODIFIED )
251 {
252 // this packet was normalized and/or has replacements
253 PacketManager::encode_update(p);
254 verdict = DAQ_VERDICT_REPLACE;
255 }
256 else if ( act->session_was_trusted() )
257 verdict = DAQ_VERDICT_WHITELIST;
258 else if ( (p->packet_flags & PKT_IGNORE) ||
259 (p->flow &&
260 (p->flow->get_ignore_direction() == SSN_DIR_BOTH ||
261 p->flow->flow_state == Flow::FlowState::ALLOW)) )
262 {
263 verdict = DAQ_VERDICT_WHITELIST;
264 }
265 else if ( p->ptrs.decode_flags & DECODE_PKT_TRUST )
266 {
267 if ( p->flow )
268 p->flow->set_ignore_direction(SSN_DIR_BOTH);
269 verdict = DAQ_VERDICT_WHITELIST;
270 }
271 else
272 verdict = DAQ_VERDICT_PASS;
273
274 if (DAQ_VERDICT_WHITELIST == verdict)
275 {
276 if (p->flow && p->flow->cannot_trust())
277 verdict = DAQ_VERDICT_PASS;
278 else if (act->get_tunnel_bypass())
279 {
280 verdict = DAQ_VERDICT_PASS;
281 daq_stats.internal_whitelist++;
282 }
283 }
284 return verdict;
285 }
286
add_to_retry_queue(DAQ_Msg_h daq_msg)287 void Analyzer::add_to_retry_queue(DAQ_Msg_h daq_msg)
288 {
289 retry_queue->put(daq_msg);
290 }
291
292 /*
293 * Private message processing methods
294 */
post_process_daq_pkt_msg(Packet * p)295 void Analyzer::post_process_daq_pkt_msg(Packet* p)
296 {
297 bool msg_was_held = false;
298
299 Active::execute(p);
300
301 DAQ_Verdict verdict = MAX_DAQ_VERDICT;
302
303 if (p->active->packet_retry_requested())
304 {
305 add_to_retry_queue(p->daq_msg);
306 daq_stats.retries_queued++;
307 }
308 else
309 {
310 msg_was_held = (p->active->is_packet_held() and Stream::set_packet_action_to_hold(p));
311 if (msg_was_held)
312 {
313 if (p->flow->flags.trigger_detained_packet_event)
314 {
315 DataBus::publish(DETAINED_PACKET_EVENT, p);
316 }
317 }
318 else
319 verdict = distill_verdict(p);
320 }
321
322 if (PacketTracer::is_active())
323 {
324 PacketTracer::log("Policies: Network %u, Inspection %u, Detection %u\n",
325 get_network_policy()->user_policy_id, get_inspection_policy()->user_policy_id,
326 get_ips_policy()->user_policy_id);
327
328 if (p->active->packet_retry_requested())
329 PacketTracer::log("Verdict: Queuing for Retry\n");
330 else if (msg_was_held)
331 PacketTracer::log("Verdict: Holding for Detection\n");
332 else
333 PacketTracer::log("Verdict: %s\n", SFDAQ::verdict_to_string(verdict));
334 PacketTracer::dump(p);
335 }
336
337 if (PacketTracer::is_daq_activated())
338 PacketTracer::daq_dump(p);
339
340 HighAvailabilityManager::process_update(p->flow, p);
341
342 if (verdict != MAX_DAQ_VERDICT)
343 {
344 // Publish an event if something has indicated that it wants the
345 // finalize event on this flow.
346 if (p->flow and p->flow->flags.trigger_finalize_event)
347 {
348 FinalizePacketEvent event(p, verdict);
349 DataBus::publish(FINALIZE_PACKET_EVENT, event);
350 }
351
352 if (verdict == DAQ_VERDICT_BLOCK or verdict == DAQ_VERDICT_BLACKLIST)
353 p->active->send_reason_to_daq(*p);
354
355 oops_handler->set_current_message(nullptr);
356 p->pkth = nullptr; // No longer avail after finalize_message.
357
358 {
359 Profile profile(daqPerfStats);
360 p->daq_instance->finalize_message(p->daq_msg, verdict);
361 }
362 }
363 }
364
process_daq_pkt_msg(DAQ_Msg_h msg,bool retry)365 void Analyzer::process_daq_pkt_msg(DAQ_Msg_h msg, bool retry)
366 {
367 const DAQ_PktHdr_t* pkthdr = daq_msg_get_pkthdr(msg);
368
369 pc.analyzed_pkts++;
370
371 if (!retry)
372 packet_time_update(&pkthdr->ts);
373
374 DetectionEngine::wait_for_context();
375 switcher->start();
376
377 Packet* p = switcher->get_context()->packet;
378 p->context->wire_packet = p;
379 p->context->packet_number = get_packet_number();
380 select_default_policy(pkthdr, p->context->conf);
381
382 DetectionEngine::reset();
383 sfthreshold_reset();
384 Active::clear_queue(p);
385
386 p->daq_msg = msg;
387 p->daq_instance = daq_instance;
388
389 PacketManager::decode(p, pkthdr, daq_msg_get_data(msg), daq_msg_get_data_len(msg), false, retry);
390
391 if (process_packet(p))
392 {
393 post_process_daq_pkt_msg(p);
394 switcher->stop();
395 }
396
397 Stream::handle_timeouts(false);
398 HighAvailabilityManager::process_receive();
399 }
400
process_daq_msg(DAQ_Msg_h msg,bool retry)401 void Analyzer::process_daq_msg(DAQ_Msg_h msg, bool retry)
402 {
403 oops_handler->set_current_message(msg);
404 memory::MemoryCap::free_space();
405
406 DAQ_Verdict verdict = DAQ_VERDICT_PASS;
407 switch (daq_msg_get_type(msg))
408 {
409 case DAQ_MSG_TYPE_PACKET:
410 process_daq_pkt_msg(msg, retry);
411 // process_daq_pkt_msg() handles finalizing the message (or tracking it if offloaded)
412 return;
413 case DAQ_MSG_TYPE_SOF:
414 case DAQ_MSG_TYPE_EOF:
415 process_daq_sof_eof_msg(msg, verdict);
416 break;
417 default:
418 {
419 daq_stats.other_messages++;
420 DaqMessageEvent event(msg, verdict);
421 DataBus::publish(DAQ_OTHER_MSG_EVENT, event);
422 }
423 break;
424 }
425 oops_handler->set_current_message(nullptr);
426 {
427 Profile profile(daqPerfStats);
428 daq_instance->finalize_message(msg, verdict);
429 }
430 }
431
process_retry_queue()432 void Analyzer::process_retry_queue()
433 {
434 if (!retry_queue->empty())
435 {
436 struct timeval now;
437 packet_gettimeofday(&now);
438 DAQ_Msg_h msg;
439
440 while ((msg = retry_queue->get(&now)) != nullptr)
441 {
442 process_daq_msg(msg, true);
443 daq_stats.retries_processed++;
444 }
445 }
446 }
447
448 /*
449 * Public packet processing methods
450 */
inspect_rebuilt(Packet * p)451 bool Analyzer::inspect_rebuilt(Packet* p)
452 {
453 DetectionEngine de;
454 return main_hook(p);
455 }
456
process_rebuilt_packet(Packet * p,const DAQ_PktHdr_t * pkthdr,const uint8_t * pkt,uint32_t pktlen)457 bool Analyzer::process_rebuilt_packet(Packet* p, const DAQ_PktHdr_t* pkthdr, const uint8_t* pkt,
458 uint32_t pktlen)
459 {
460 PacketManager::decode(p, pkthdr, pkt, pktlen, true);
461
462 p->packet_flags |= (PKT_PSEUDO | PKT_REBUILT_FRAG);
463 p->pseudo_type = PSEUDO_PKT_IP;
464
465 return process_packet(p);
466 }
467
post_process_packet(Packet * p)468 void Analyzer::post_process_packet(Packet* p)
469 {
470 post_process_daq_pkt_msg(p);
471 // FIXIT-? There is an assumption that this is being called on the active context...
472 switcher->stop();
473 }
474
finalize_daq_message(DAQ_Msg_h msg,DAQ_Verdict verdict)475 void Analyzer::finalize_daq_message(DAQ_Msg_h msg, DAQ_Verdict verdict)
476 {
477 Profile profile(daqPerfStats);
478 daq_instance->finalize_message(msg, verdict);
479 }
480
481 //-------------------------------------------------------------------------
482 // Utility
483 //-------------------------------------------------------------------------
484
show_source()485 void Analyzer::show_source()
486 {
487 const char* pcap = source.c_str();
488
489 if (!strcmp(pcap, "-"))
490 pcap = "stdin";
491
492 if (get_run_num() != 1)
493 fprintf(stdout, "%s", "\n");
494
495 fprintf(stdout, "Reading network traffic from \"%s\" with snaplen = %u\n",
496 pcap, SnortConfig::get_conf()->daq_config->get_mru_size());
497 }
498
set_state(State s)499 void Analyzer::set_state(State s)
500 {
501 state = s;
502 main_poke(id);
503 }
504
get_state_string()505 const char* Analyzer::get_state_string()
506 {
507 State s = get_state(); // can't use atomic in switch with optimization
508
509 switch ( s )
510 {
511 case State::NEW: return "NEW";
512 case State::INITIALIZED: return "INITIALIZED";
513 case State::STARTED: return "STARTED";
514 case State::RUNNING: return "RUNNING";
515 case State::PAUSED: return "PAUSED";
516 case State::STOPPED: return "STOPPED";
517 default: assert(false);
518 }
519
520 return "UNKNOWN";
521 }
522
523 //-------------------------------------------------------------------------
524 // Thread life cycle
525 //-------------------------------------------------------------------------
526
idle()527 void Analyzer::idle()
528 {
529 idling = true;
530
531 // FIXIT-L this whole thing could be pub-sub
532 daq_stats.idle++;
533
534 // This should only be called if the DAQ timeout elapsed, so increment the packet time
535 // by the DAQ timeout.
536 struct timeval now, increment;
537 unsigned int timeout = SnortConfig::get_conf()->daq_config->timeout;
538 packet_gettimeofday(&now);
539 increment = { static_cast<time_t>(timeout / 1000), static_cast<suseconds_t>((timeout % 1000) * 1000) };
540 timeradd(&now, &increment, &now);
541 packet_time_update(&now);
542
543 DataBus::publish(THREAD_IDLE_EVENT, nullptr);
544
545 // Service the retry queue with the new packet time.
546 process_retry_queue();
547
548 Stream::handle_timeouts(true);
549
550 HighAvailabilityManager::process_receive();
551
552 handle_uncompleted_commands();
553
554 idling = false;
555 }
556
557 /*
558 * Perform all packet thread initialization actions that can be taken with dropped privileges
559 * and/or must be called after the DAQ module has been started.
560 */
init_unprivileged()561 void Analyzer::init_unprivileged()
562 {
563 // using dummy values until further integration
564 // FIXIT-M max_contexts must be <= DAQ msg pool to avoid permanent stall (offload only)
565 // condition (polling for packets that won't come to resume ready suspends)
566 #ifdef REG_TEST
567 const unsigned max_contexts = 20;
568 #else
569 const unsigned max_contexts = 255;
570 #endif
571
572 switcher = new ContextSwitcher;
573
574 for ( unsigned i = 0; i < max_contexts; ++i )
575 switcher->push(new IpsContext);
576
577 const SnortConfig* sc = SnortConfig::get_conf();
578
579 // This should be called as soon as possible
580 // to handle all trace log messages
581 TraceApi::thread_init(sc->trace_config);
582
583 CodecManager::thread_init(sc);
584
585 // this depends on instantiated daq capabilities
586 // so it is done here instead of init()
587 Active::thread_init(sc);
588
589 InitTag();
590 EventTrace_Init();
591 detection_filter_init(sc->detection_filter_config);
592
593 EventManager::open_outputs();
594 IpsManager::setup_options(sc);
595 ActionManager::thread_init(sc);
596 FileService::thread_init();
597 SideChannelManager::thread_init();
598 HighAvailabilityManager::thread_init(); // must be before InspectorManager::thread_init();
599 InspectorManager::thread_init(sc);
600 PacketTracer::thread_init();
601 HostAttributesManager::initialize();
602
603 // in case there are HA messages waiting, process them first
604 HighAvailabilityManager::process_receive();
605 PacketManager::thread_init();
606
607 // init filters hash tables that depend on alerts
608 sfthreshold_alloc(sc->threshold_config->memcap, sc->threshold_config->memcap);
609 SFRF_Alloc(sc->rate_filter_config->memcap);
610 }
611
reinit(const SnortConfig * sc)612 void Analyzer::reinit(const SnortConfig* sc)
613 {
614 InspectorManager::thread_reinit(sc);
615 ActionManager::thread_reinit(sc);
616 TraceApi::thread_reinit(sc->trace_config);
617 }
618
stop_removed(const SnortConfig * sc)619 void Analyzer::stop_removed(const SnortConfig* sc)
620 {
621 InspectorManager::thread_stop_removed(sc);
622 }
623
term()624 void Analyzer::term()
625 {
626 const SnortConfig* sc = SnortConfig::get_conf();
627
628 HighAvailabilityManager::thread_term_beginning();
629
630 if ( !sc->dirty_pig )
631 Stream::purge_flows();
632
633 DAQ_Msg_h msg;
634 while ((msg = retry_queue->get()) != nullptr)
635 {
636 daq_stats.retries_discarded++;
637 Profile profile(daqPerfStats);
638 daq_instance->finalize_message(msg, DAQ_VERDICT_BLOCK);
639 }
640
641 DetectionEngine::idle();
642 InspectorManager::thread_stop(sc);
643 ModuleManager::accumulate();
644 InspectorManager::thread_term();
645 ActionManager::thread_term();
646
647 IpsManager::clear_options(sc);
648 EventManager::close_outputs();
649 CodecManager::thread_term();
650 HighAvailabilityManager::thread_term();
651 SideChannelManager::thread_term();
652
653 oops_handler->set_current_message(nullptr);
654
655 daq_instance->stop();
656 SFDAQ::set_local_instance(nullptr);
657
658 PacketLatency::tterm();
659 RuleLatency::tterm();
660
661 Profiler::consolidate_stats();
662
663 DetectionEngine::thread_term();
664 detection_filter_term();
665 EventTrace_Term();
666 CleanupTag();
667 FileService::thread_term();
668 PacketTracer::thread_term();
669 PacketManager::thread_term();
670
671 Active::thread_term();
672 delete switcher;
673
674 sfthreshold_free();
675 RateFilter_Cleanup();
676
677 TraceApi::thread_term();
678
679 ModuleManager::accumulate_module("memory");
680 }
681
Analyzer(SFDAQInstance * instance,unsigned i,const char * s,uint64_t msg_cnt)682 Analyzer::Analyzer(SFDAQInstance* instance, unsigned i, const char* s, uint64_t msg_cnt)
683 {
684 id = i;
685 exit_after_cnt = msg_cnt;
686 source = s ? s : "";
687 daq_instance = instance;
688 oops_handler = new OopsHandler();
689 retry_queue = new RetryQueue(200);
690 set_state(State::NEW);
691 }
692
~Analyzer()693 Analyzer::~Analyzer()
694 {
695 delete daq_instance;
696 delete oops_handler;
697 delete retry_queue;
698 }
699
operator ()(Swapper * ps,uint16_t run_num)700 void Analyzer::operator()(Swapper* ps, uint16_t run_num)
701 {
702 oops_handler->tinit();
703
704 set_thread_type(STHREAD_TYPE_PACKET);
705 set_instance_id(id);
706 set_run_num(run_num);
707 local_analyzer = this;
708
709 ps->apply(*this);
710
711 if (SnortConfig::get_conf()->pcap_show())
712 show_source();
713
714 // init here to pin separately from packet threads
715 DetectionEngine::thread_init();
716
717 // Perform all packet thread initialization actions that need to be taken with escalated
718 // privileges prior to starting the DAQ module.
719 SnortConfig::get_conf()->thread_config->implement_thread_affinity(
720 STHREAD_TYPE_PACKET, get_instance_id());
721
722 SFDAQ::set_local_instance(daq_instance);
723 set_state(State::INITIALIZED);
724
725 Profiler::start();
726
727 // Start the main loop
728 analyze();
729
730 Profiler::stop(pc.analyzed_pkts);
731 term();
732
733 set_state(State::STOPPED);
734
735 oops_handler->tterm();
736 }
737
738 /* Note: This will be called from the main thread. Everything it does must be
739 thread-safe in relation to interactions with the analyzer thread. */
execute(AnalyzerCommand * ac)740 void Analyzer::execute(AnalyzerCommand* ac)
741 {
742 pending_work_queue_mutex.lock();
743 pending_work_queue.push(ac);
744 pending_work_queue_mutex.unlock();
745
746 /* Break out of the DAQ acquire loop so that the command will be processed.
747 This is explicitly safe to call from another thread. */
748 if ( state >= State::STARTED and state < State::STOPPED and daq_instance )
749 daq_instance->interrupt();
750 }
751
handle_command()752 bool Analyzer::handle_command()
753 {
754 AnalyzerCommand* ac = nullptr;
755
756 pending_work_queue_mutex.lock();
757 if (!pending_work_queue.empty())
758 {
759 ac = pending_work_queue.front();
760 pending_work_queue.pop();
761 }
762 pending_work_queue_mutex.unlock();
763
764 if (!ac)
765 return false;
766
767 void* ac_state = nullptr;
768 if ( ac->execute(*this, &ac_state) )
769 add_command_to_completed_queue(ac);
770 else
771 add_command_to_uncompleted_queue(ac, ac_state);
772
773 return true;
774 }
775
add_command_to_uncompleted_queue(AnalyzerCommand * aci,void * acs)776 void Analyzer::add_command_to_uncompleted_queue(AnalyzerCommand* aci, void* acs)
777 {
778 UncompletedAnalyzerCommand* cac = new UncompletedAnalyzerCommand(aci, acs);
779
780 uncompleted_work_queue.push_back(cac);
781 }
782
add_command_to_completed_queue(AnalyzerCommand * ac)783 void Analyzer::add_command_to_completed_queue(AnalyzerCommand* ac)
784 {
785 completed_work_queue_mutex.lock();
786 completed_work_queue.push(ac);
787 completed_work_queue_mutex.unlock();
788 }
789
handle_commands()790 void Analyzer::handle_commands()
791 {
792 while (handle_command())
793 ;
794 }
795
handle_uncompleted_commands()796 void Analyzer::handle_uncompleted_commands()
797 {
798 std::list<UncompletedAnalyzerCommand*>::iterator it = uncompleted_work_queue.begin();
799 while (it != uncompleted_work_queue.end() )
800 {
801 UncompletedAnalyzerCommand* cac = *it;
802
803 if (cac->command->execute(*this, &cac->state) )
804 {
805 add_command_to_completed_queue(cac->command);
806 it = uncompleted_work_queue.erase(it);
807 delete cac;
808 }
809 else
810 ++it;
811 }
812 }
813
process_messages()814 DAQ_RecvStatus Analyzer::process_messages()
815 {
816 // Max receive becomes the minimum of the configured batch size, the remaining exit_after
817 // count (if requested), and the remaining pause_after count (if requested).
818 unsigned max_recv = daq_instance->get_batch_size();
819 if (exit_after_cnt && exit_after_cnt < max_recv)
820 max_recv = exit_after_cnt;
821 if (pause_after_cnt && pause_after_cnt < max_recv)
822 max_recv = pause_after_cnt;
823
824 DAQ_RecvStatus rstat;
825 {
826 Profile profile(daqPerfStats);
827 rstat = daq_instance->receive_messages(max_recv);
828 }
829
830 // Preemptively service available onloads to potentially unblock processing the first message.
831 // This conveniently handles servicing offloads in the no messages received case as well.
832 DetectionEngine::onload();
833
834 unsigned num_recv = 0;
835 DAQ_Msg_h msg;
836 while ((msg = daq_instance->next_message()) != nullptr)
837 {
838 // Dispose of any messages to be skipped first.
839 if (skip_cnt > 0)
840 {
841 Profile profile(daqPerfStats);
842 daq_stats.skipped++;
843 skip_cnt--;
844 daq_instance->finalize_message(msg, DAQ_VERDICT_PASS);
845 continue;
846 }
847 // FIXIT-M reimplement fail-open capability?
848 num_recv++;
849 // IMPORTANT: process_daq_msg() is responsible for finalizing the messages.
850 process_daq_msg(msg, false);
851 DetectionEngine::onload();
852 process_retry_queue();
853 handle_uncompleted_commands();
854 }
855
856 if (exit_after_cnt && (exit_after_cnt -= num_recv) == 0)
857 stop();
858 if (pause_after_cnt && (pause_after_cnt -= num_recv) == 0)
859 pause();
860 return rstat;
861 }
862
analyze()863 void Analyzer::analyze()
864 {
865 while (!exit_requested)
866 {
867 // If we're not in the running state (usually either pre-start or paused),
868 // just keep stalling until something else comes up.
869 if (state != State::RUNNING)
870 {
871 if (!handle_command())
872 {
873 chrono::milliseconds ms(10);
874 this_thread::sleep_for(ms);
875 }
876 continue;
877 }
878
879 // Receive and process a batch of messages. Evaluate the receive status after processing
880 // the returned messages to determine if we should immediately continue, take the opportunity
881 // to deal with some house cleaning work, or terminate the analyzer thread.
882 DAQ_RecvStatus rstat = process_messages();
883 if (rstat != DAQ_RSTAT_OK && rstat != DAQ_RSTAT_WOULD_BLOCK)
884 {
885 if (rstat == DAQ_RSTAT_TIMEOUT)
886 {
887 // If the receive timed out, let's do some idle work before continuing.
888 // FIXIT-L Hitting a one second timeout when attached to any real traffic source
889 // is extremely unlikely, so relying on anything in thread_idle() ever being
890 // called is dangerous.
891 idle();
892 }
893 else if (rstat == DAQ_RSTAT_INTERRUPTED)
894 {
895 // If the status reports INTERRUPTED because of an interrupt() call, exit_requested should
896 // be set for the next pass through the main loop. Use this as a hint to check for analyzer
897 // commands.
898 handle_commands();
899 }
900 else
901 {
902 if (rstat == DAQ_RSTAT_NOBUF)
903 ErrorMessage("Exhausted the DAQ message pool!\n");
904 else if (rstat == DAQ_RSTAT_ERROR)
905 ErrorMessage("Error receiving message from the DAQ instance: %s\n", daq_instance->get_error());
906 // Implicitly handled:
907 // DAQ_RSTAT_EOF - File readback completed, job well done; let's get out of here.
908 // DAQ_RSTAT_INVALID - This really shouldn't happen.
909 break;
910 }
911 }
912 }
913 }
914
start()915 void Analyzer::start()
916 {
917 assert(state == State::INITIALIZED);
918
919 if (!daq_instance->start())
920 {
921 ErrorMessage("Analyzer: Failed to start DAQ instance\n");
922 exit_requested = true;
923 }
924 set_state(State::STARTED);
925 }
926
run(bool paused)927 void Analyzer::run(bool paused)
928 {
929 assert(state == State::STARTED);
930 init_unprivileged();
931 if ( paused )
932 set_state(State::PAUSED);
933 else
934 set_state(State::RUNNING);
935 }
936
stop()937 void Analyzer::stop()
938 {
939 exit_requested = true;
940 }
941
pause()942 void Analyzer::pause()
943 {
944 if (state == State::RUNNING)
945 {
946 set_state(State::PAUSED);
947 LogMessage("== [%u] paused\n", id);
948 }
949 else
950 ErrorMessage("Analyzer: Received PAUSE command while in state %s\n",
951 get_state_string());
952 }
953
resume(uint64_t msg_cnt)954 void Analyzer::resume(uint64_t msg_cnt)
955 {
956 if (state == State::PAUSED)
957 {
958 set_pause_after_cnt(msg_cnt);
959 set_state(State::RUNNING);
960 }
961 else
962 ErrorMessage("Analyzer: Received RESUME command while in state %s\n",
963 get_state_string());
964 }
965
reload_daq()966 void Analyzer::reload_daq()
967 {
968 if (daq_instance)
969 daq_instance->reload();
970 }
971
rotate()972 void Analyzer::rotate()
973 {
974 DataBus::publish(THREAD_ROTATE_EVENT, nullptr);
975 }
976
977