1 /*
2 Copyright 2021 Northern.tech AS
3
4 This file is part of CFEngine 3 - written and maintained by Northern.tech AS.
5
6 This program is free software; you can redistribute it and/or modify it
7 under the terms of the GNU General Public License as published by the
8 Free Software Foundation; version 3.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA
18
19 To the extent this program is licensed as part of the Enterprise
20 versions of CFEngine, the applicable Commercial Open Source License
21 (COSL) may apply to this file if you as a licensee so wish it. See
22 included file COSL.txt.
23 */
24
25
26 #include <env_monitor.h>
27
28 #include <eval_context.h>
29 #include <mon.h>
30 #include <granules.h>
31 #include <dbm_api.h>
32 #include <policy.h>
33 #include <promises.h>
34 #include <item_lib.h>
35 #include <conversion.h>
36 #include <ornaments.h>
37 #include <expand.h>
38 #include <scope.h>
39 #include <sysinfo.h>
40 #include <signals.h>
41 #include <locks.h>
42 #include <exec_tools.h>
43 #include <generic_agent.h> // WritePID
44 #include <files_lib.h>
45 #include <file_lib.h> // SetUmask()
46 #include <unix.h>
47 #include <verify_measurements.h>
48 #include <verify_classes.h>
49 #include <known_dirs.h>
50 #include <probes.h> /* MonOtherInit,MonOtherGatherData */
51 #include <history.h> /* HistoryUpdate */
52 #include <monitoring.h> /* GetObservable */
53 #include <cleanup.h>
54
55
56 /*****************************************************************************/
57 /* Globals */
58 /*****************************************************************************/
59
60 #define CF_ENVNEW_FILE "env_data.new"
61 #define cf_noise_threshold 6 /* number that does not warrant large anomaly status */
62 #define MON_THRESHOLD_HIGH 1000000 // samples should stay below this threshold
63 #define LDT_BUFSIZE 10
64
65 double FORGETRATE = 0.7;
66
67 static char ENVFILE_NEW[CF_BUFSIZE] = "";
68 static char ENVFILE[CF_BUFSIZE] = "";
69
70 static double HISTOGRAM[CF_OBSERVABLES][7][CF_GRAINS] = { { { 0.0 } } };
71
72 /* persistent observations */
73
74 static double CF_THIS[CF_OBSERVABLES] = { 0.0 };
75
76 /* Work */
77
78 static long ITER = 0; /* Iteration since start */
79 static double AGE = 0.0, WAGE = 0.0; /* Age and weekly age of database */
80
81 static Averages LOCALAV = { 0 };
82
83 /* Leap Detection vars */
84
85 static double LDT_BUF[CF_OBSERVABLES][LDT_BUFSIZE] = { { 0 } };
86 static double LDT_SUM[CF_OBSERVABLES] = { 0 };
87 static double LDT_AVG[CF_OBSERVABLES] = { 0 };
88 static double CHI_LIMIT[CF_OBSERVABLES] = { 0 };
89 static double CHI[CF_OBSERVABLES] = { 0 };
90 static double LDT_MAX[CF_OBSERVABLES] = { 0 };
91 static int LDT_POS = 0;
92 static int LDT_FULL = false;
93
94 int NO_FORK = false;
95
96 /*******************************************************************/
97 /* Prototypes */
98 /*******************************************************************/
99
100 static void GetDatabaseAge(void);
101 static void LoadHistogram(void);
102 static void GetQ(EvalContext *ctx, const Policy *policy);
103 static Averages EvalAvQ(EvalContext *ctx, char *timekey);
104 static void ArmClasses(EvalContext *ctx, const Averages *newvals);
105 static void GatherPromisedMeasures(EvalContext *ctx, const Policy *policy);
106
107 static void LeapDetection(void);
108 static Averages *GetCurrentAverages(char *timekey);
109 static void UpdateAverages(EvalContext *ctx, char *timekey, const Averages *newvals);
110 static void UpdateDistributions(EvalContext *ctx, char *timekey, Averages *av);
111 static double WAverage(double new_val, double old_val, double age);
112 static double SetClasses(EvalContext *ctx, char *name, double variable, double av_expect, double av_var, double localav_expect,
113 double localav_var, Item **classlist);
114 static void SetVariable(char *name, double now, double average, double stddev, Item **list);
115 static double RejectAnomaly(double new, double av, double var, double av2, double var2);
116 static void ZeroArrivals(void);
117 static PromiseResult KeepMonitorPromise(EvalContext *ctx, const Promise *pp, void *param);
118
119 /****************************************************************/
120
MonitorInitialize(void)121 void MonitorInitialize(void)
122 {
123 int i, j, k;
124 char vbuff[CF_BUFSIZE];
125 const char* const statedir = GetStateDir();
126
127 snprintf(vbuff, sizeof(vbuff), "%s/cf_users", statedir);
128 MapName(vbuff);
129 CreateEmptyFile(vbuff);
130
131 snprintf(ENVFILE_NEW, CF_BUFSIZE, "%s/%s", statedir, CF_ENVNEW_FILE);
132 MapName(ENVFILE_NEW);
133
134 snprintf(ENVFILE, CF_BUFSIZE, "%s/%s", statedir, CF_ENV_FILE);
135 MapName(ENVFILE);
136
137 MonEntropyClassesInit();
138
139 GetDatabaseAge();
140
141 for (i = 0; i < CF_OBSERVABLES; i++)
142 {
143 LOCALAV.Q[i] = QDefinite(0.0);
144 }
145
146 for (i = 0; i < CF_OBSERVABLES; i++)
147 {
148 for (j = 0; j < 7; j++)
149 {
150 for (k = 0; k < CF_GRAINS; k++)
151 {
152 HISTOGRAM[i][j][k] = 0;
153 }
154 }
155 }
156
157 for (i = 0; i < CF_OBSERVABLES; i++)
158 {
159 CHI[i] = 0;
160 CHI_LIMIT[i] = 0.1;
161 LDT_AVG[i] = 0;
162 LDT_SUM[i] = 0;
163 }
164
165 srand((unsigned int) time(NULL));
166 LoadHistogram();
167
168 /* Look for local sensors - this is unfortunately linux-centric */
169
170 MonNetworkInit();
171 MonTempInit();
172 MonOtherInit();
173
174 Log(LOG_LEVEL_DEBUG, "Finished with monitor initialization");
175 }
176
177 /*********************************************************************/
178 /* Level 2 */
179 /*********************************************************************/
180
GetDatabaseAge()181 static void GetDatabaseAge()
182 {
183 CF_DB *dbp;
184
185 if (!OpenDB(&dbp, dbid_observations))
186 {
187 return;
188 }
189
190 if (ReadDB(dbp, "DATABASE_AGE", &AGE, sizeof(double)))
191 {
192 WAGE = AGE / SECONDS_PER_WEEK * CF_MEASURE_INTERVAL;
193 Log(LOG_LEVEL_DEBUG, "Previous DATABASE_AGE %f", AGE);
194 }
195 else
196 {
197 Log(LOG_LEVEL_DEBUG, "No previous DATABASE_AGE");
198 AGE = 0.0;
199 }
200
201 CloseDB(dbp);
202 }
203
204 /*********************************************************************/
205
LoadHistogram(void)206 static void LoadHistogram(void)
207 {
208 int i, day, position;
209 double maxval[CF_OBSERVABLES];
210
211 char filename[CF_BUFSIZE];
212
213 snprintf(filename, CF_BUFSIZE, "%s%chistograms", GetStateDir(), FILE_SEPARATOR);
214
215 FILE *fp = safe_fopen(filename, "r");
216 if (fp == NULL)
217 {
218 Log(LOG_LEVEL_VERBOSE,
219 "Unable to load histogram data from '%s' (fopen: %s)",
220 filename, GetErrorStr());
221 return;
222 }
223
224 for (i = 0; i < CF_OBSERVABLES; i++)
225 {
226 maxval[i] = 1.0;
227 }
228
229 for (position = 0; position < CF_GRAINS; position++)
230 {
231 if (fscanf(fp, "%d ", &position) != 1)
232 {
233 Log(LOG_LEVEL_ERR, "Format error in histogram file '%s' - aborting", filename);
234 break;
235 }
236
237 for (i = 0; i < CF_OBSERVABLES; i++)
238 {
239 for (day = 0; day < 7; day++)
240 {
241 if (fscanf(fp, "%lf ", &(HISTOGRAM[i][day][position])) != 1)
242 {
243 Log(LOG_LEVEL_VERBOSE, "Format error in histogram file '%s'. (fscanf: %s)", filename, GetErrorStr());
244 HISTOGRAM[i][day][position] = 0;
245 }
246
247 if (HISTOGRAM[i][day][position] < 0)
248 {
249 HISTOGRAM[i][day][position] = 0;
250 }
251
252 if (HISTOGRAM[i][day][position] > maxval[i])
253 {
254 maxval[i] = HISTOGRAM[i][day][position];
255 }
256
257 HISTOGRAM[i][day][position] *= 1000.0 / maxval[i];
258 }
259 }
260 }
261
262 fclose(fp);
263 }
264
265 /*********************************************************************/
266
MonitorStartServer(EvalContext * ctx,const Policy * policy)267 void MonitorStartServer(EvalContext *ctx, const Policy *policy)
268 {
269 char timekey[CF_SMALLBUF];
270 Averages averages;
271
272 Policy *monitor_cfengine_policy = PolicyNew();
273 Promise *pp = NULL;
274 {
275 Bundle *bp = PolicyAppendBundle(monitor_cfengine_policy, NamespaceDefault(), "monitor_cfengine_bundle", "agent", NULL, NULL);
276 BundleSection *sp = BundleAppendSection(bp, "monitor_cfengine");
277
278 pp = BundleSectionAppendPromise(sp, "the monitor daemon", (Rval) { NULL, RVAL_TYPE_NOPROMISEE }, NULL, NULL);
279 }
280 assert(pp);
281
282 CfLock thislock;
283
284 #ifdef __MINGW32__
285
286 if (!NO_FORK)
287 {
288 Log(LOG_LEVEL_VERBOSE, "Windows does not support starting processes in the background - starting in foreground");
289 }
290
291 #else /* !__MINGW32__ */
292
293 if ((!NO_FORK) && (fork() != 0))
294 {
295 Log(LOG_LEVEL_INFO, "cf-monitord: starting");
296 _exit(EXIT_SUCCESS);
297 }
298
299 if (!NO_FORK)
300 {
301 ActAsDaemon();
302 }
303
304 #endif /* !__MINGW32__ */
305
306 thislock = AcquireLock(ctx, pp->promiser, VUQNAME, CFSTARTTIME, 0, 0, pp, false);
307 if (thislock.lock == NULL)
308 {
309 PolicyDestroy(monitor_cfengine_policy);
310 return;
311 }
312
313 WritePID("cf-monitord.pid");
314
315 MonNetworkSnifferOpen();
316
317 while (!IsPendingTermination())
318 {
319 GetQ(ctx, policy);
320 snprintf(timekey, sizeof(timekey), "%s", GenTimeKey(time(NULL)));
321 averages = EvalAvQ(ctx, timekey);
322 LeapDetection();
323 ArmClasses(ctx, &averages);
324
325 ZeroArrivals();
326
327 MonNetworkSnifferSniff(EvalContextGetIpAddresses(ctx), ITER, CF_THIS);
328
329 ITER++;
330 }
331
332 PolicyDestroy(monitor_cfengine_policy);
333 YieldCurrentLock(thislock);
334 }
335
336 /*********************************************************************/
337
GetQ(EvalContext * ctx,const Policy * policy)338 static void GetQ(EvalContext *ctx, const Policy *policy)
339 {
340 MonEntropyClassesReset();
341
342 ZeroArrivals();
343
344 MonProcessesGatherData(CF_THIS);
345 MonDiskGatherData(CF_THIS);
346 #ifndef __MINGW32__
347 MonCPUGatherData(CF_THIS);
348 MonLoadGatherData(CF_THIS);
349 MonNetworkGatherData(CF_THIS);
350 MonNetworkSnifferGatherData();
351 MonTempGatherData(CF_THIS);
352 #endif /* !__MINGW32__ */
353 MonOtherGatherData(CF_THIS);
354 GatherPromisedMeasures(ctx, policy);
355 }
356
357 /*********************************************************************/
358
EvalAvQ(EvalContext * ctx,char * t)359 static Averages EvalAvQ(EvalContext *ctx, char *t)
360 {
361 Averages *lastweek_vals, newvals;
362 double last5_vals[CF_OBSERVABLES];
363 char name[CF_MAXVARSIZE];
364 time_t now = time(NULL);
365 int i;
366
367 for (i = 0; i < CF_OBSERVABLES; i++)
368 {
369 last5_vals[i] = 0.0;
370 }
371
372 Banner("Evaluating and storing new weekly averages");
373
374 if ((lastweek_vals = GetCurrentAverages(t)) == NULL)
375 {
376 Log(LOG_LEVEL_ERR, "Error reading average database");
377 DoCleanupAndExit(EXIT_FAILURE);
378 }
379
380 /* Discard any apparently anomalous behaviour before renormalizing database */
381
382 for (i = 0; i < CF_OBSERVABLES; i++)
383 {
384 double delta2;
385 char desc[CF_BUFSIZE];
386 double This;
387 name[0] = '\0';
388 GetObservable(i, name, desc);
389
390 /* Overflow protection */
391
392 if (lastweek_vals->Q[i].expect < 0)
393 {
394 lastweek_vals->Q[i].expect = 0;
395 }
396
397 if (lastweek_vals->Q[i].q < 0)
398 {
399 lastweek_vals->Q[i].q = 0;
400 }
401
402 if (lastweek_vals->Q[i].var < 0)
403 {
404 lastweek_vals->Q[i].var = 0;
405 }
406
407 // lastweek_vals is last week's stored data
408
409 This =
410 RejectAnomaly(CF_THIS[i], lastweek_vals->Q[i].expect, lastweek_vals->Q[i].var, LOCALAV.Q[i].expect,
411 LOCALAV.Q[i].var);
412
413 newvals.Q[i].q = This;
414 newvals.last_seen = now; // Record the freshness of this slot
415
416 LOCALAV.Q[i].q = This;
417
418 Log(LOG_LEVEL_DEBUG, "Previous week's '%s.q' %lf", name, lastweek_vals->Q[i].q);
419 Log(LOG_LEVEL_DEBUG, "Previous week's '%s.var' %lf", name, lastweek_vals->Q[i].var);
420 Log(LOG_LEVEL_DEBUG, "Previous week's '%s.ex' %lf", name, lastweek_vals->Q[i].expect);
421
422 Log(LOG_LEVEL_DEBUG, "Just measured: CF_THIS[%s] = %lf", name, CF_THIS[i]);
423 Log(LOG_LEVEL_DEBUG, "Just sanitized: This[%s] = %lf", name, This);
424
425 newvals.Q[i].expect = WAverage(This, lastweek_vals->Q[i].expect, WAGE);
426 LOCALAV.Q[i].expect = WAverage(newvals.Q[i].expect, LOCALAV.Q[i].expect, ITER);
427
428 if (last5_vals[i] > 0)
429 {
430 newvals.Q[i].dq = newvals.Q[i].q - last5_vals[i];
431 LOCALAV.Q[i].dq = newvals.Q[i].q - last5_vals[i];
432 }
433 else
434 {
435 newvals.Q[i].dq = 0;
436 LOCALAV.Q[i].dq = 0;
437 }
438
439 // Save the last measured value as the value "from five minutes ago" to get the gradient
440 last5_vals[i] = newvals.Q[i].q;
441
442 delta2 = (This - lastweek_vals->Q[i].expect) * (This - lastweek_vals->Q[i].expect);
443
444 if (lastweek_vals->Q[i].var > delta2 * 2.0)
445 {
446 /* Clean up past anomalies */
447 newvals.Q[i].var = delta2;
448 LOCALAV.Q[i].var = WAverage(newvals.Q[i].var, LOCALAV.Q[i].var, ITER);
449 }
450 else
451 {
452 newvals.Q[i].var = WAverage(delta2, lastweek_vals->Q[i].var, WAGE);
453 LOCALAV.Q[i].var = WAverage(newvals.Q[i].var, LOCALAV.Q[i].var, ITER);
454 }
455
456 Log(LOG_LEVEL_VERBOSE, "[%d] %s q=%lf, var=%lf, ex=%lf", i, name,
457 newvals.Q[i].q, newvals.Q[i].var, newvals.Q[i].expect);
458
459 Log(LOG_LEVEL_VERBOSE, "[%d] = %lf -> (%lf#%lf) local [%lf#%lf]", i, This, newvals.Q[i].expect,
460 sqrt(newvals.Q[i].var), LOCALAV.Q[i].expect, sqrt(LOCALAV.Q[i].var));
461
462 if (This > 0)
463 {
464 Log(LOG_LEVEL_VERBOSE, "Storing %.2lf in %s", This, name);
465 }
466 }
467
468 UpdateAverages(ctx, t, &newvals);
469 UpdateDistributions(ctx, t, lastweek_vals); /* Distribution about mean */
470
471 return newvals;
472 }
473
474 /*********************************************************************/
475
LeapDetection(void)476 static void LeapDetection(void)
477 {
478 int i, last_pos = LDT_POS;
479 double n1, n2, d;
480 double padding = 0.2;
481
482 if (++LDT_POS >= LDT_BUFSIZE)
483 {
484 LDT_POS = 0;
485
486 if (!LDT_FULL)
487 {
488 Log(LOG_LEVEL_DEBUG, "LDT Buffer full at %d", LDT_BUFSIZE);
489 LDT_FULL = true;
490 }
491 }
492
493 for (i = 0; i < CF_OBSERVABLES; i++)
494 {
495 /* First do some anomaly rejection. Sudden jumps must be numerical errors. */
496
497 if ((LDT_BUF[i][last_pos] > 0) && ((CF_THIS[i] / LDT_BUF[i][last_pos]) > 1000))
498 {
499 CF_THIS[i] = LDT_BUF[i][last_pos];
500 }
501
502 /* Note AVG should contain n+1 but not SUM, hence funny increments */
503
504 LDT_AVG[i] = LDT_AVG[i] + CF_THIS[i] / ((double) LDT_BUFSIZE + 1.0);
505
506 d = (double) (LDT_BUFSIZE * (LDT_BUFSIZE + 1)) * LDT_AVG[i];
507
508 if (LDT_FULL && (LDT_POS == 0))
509 {
510 n2 = (LDT_SUM[i] - (double) LDT_BUFSIZE * LDT_MAX[i]);
511
512 if (d < 0.001)
513 {
514 CHI_LIMIT[i] = 0.5;
515 }
516 else
517 {
518 CHI_LIMIT[i] = padding + sqrt(n2 * n2 / d);
519 }
520
521 LDT_MAX[i] = 0.0;
522 }
523
524 if (CF_THIS[i] > LDT_MAX[i])
525 {
526 LDT_MAX[i] = CF_THIS[i];
527 }
528
529 n1 = (LDT_SUM[i] - (double) LDT_BUFSIZE * CF_THIS[i]);
530
531 if (d < 0.001)
532 {
533 CHI[i] = 0.0;
534 }
535 else
536 {
537 CHI[i] = sqrt(n1 * n1 / d);
538 }
539
540 LDT_AVG[i] = LDT_AVG[i] - LDT_BUF[i][LDT_POS] / ((double) LDT_BUFSIZE + 1.0);
541 LDT_BUF[i][LDT_POS] = CF_THIS[i];
542 LDT_SUM[i] = LDT_SUM[i] - LDT_BUF[i][LDT_POS] + CF_THIS[i];
543 }
544 }
545
546 /*********************************************************************/
547
PublishEnvironment(Item * classes)548 static void PublishEnvironment(Item *classes)
549 {
550 unlink(ENVFILE_NEW);
551
552 const mode_t old_umask = SetUmask(0077);
553 FILE *fp = safe_fopen(ENVFILE_NEW, "a");
554 RestoreUmask(old_umask);
555 if (fp == NULL)
556 {
557 return;
558 }
559
560 for (Item *ip = classes; ip != NULL; ip = ip->next)
561 {
562 fprintf(fp, "%s\n", ip->name);
563 }
564
565 MonEntropyClassesPublish(fp);
566
567 fclose(fp);
568
569 rename(ENVFILE_NEW, ENVFILE);
570 }
571
572 /*********************************************************************/
573
AddOpenPorts(const char * name,const Item * value,Item ** mon_data)574 static void AddOpenPorts(const char *name, const Item *value, Item **mon_data)
575 {
576 Writer *w = StringWriter();
577 WriterWriteF(w, "@%s=", name);
578 PrintItemList(value, w);
579 if (StringWriterLength(w) <= 1500)
580 {
581 AppendItem(mon_data, StringWriterData(w), NULL);
582 }
583 WriterClose(w);
584 }
585
ArmClasses(EvalContext * ctx,const Averages * const av)586 static void ArmClasses(EvalContext *ctx, const Averages *const av)
587 {
588 assert(av != NULL);
589 double sigma;
590 Item *ip, *mon_data = NULL;
591 int i, j, k;
592 char buff[CF_BUFSIZE], ldt_buff[CF_BUFSIZE], name[CF_MAXVARSIZE];
593 static int anomaly[CF_OBSERVABLES][LDT_BUFSIZE] = { { 0 } };
594 extern Item *ALL_INCOMING;
595 extern Item *MON_UDP4, *MON_UDP6, *MON_TCP4, *MON_TCP6;
596
597 for (i = 0; i < CF_OBSERVABLES; i++)
598 {
599 char desc[CF_BUFSIZE];
600
601 GetObservable(i, name, desc);
602 sigma = SetClasses(ctx, name, CF_THIS[i], av->Q[i].expect, av->Q[i].var, LOCALAV.Q[i].expect, LOCALAV.Q[i].var, &mon_data);
603 SetVariable(name, CF_THIS[i], av->Q[i].expect, sigma, &mon_data);
604
605 /* LDT */
606
607 ldt_buff[0] = '\0';
608
609 anomaly[i][LDT_POS] = false;
610
611 if (!LDT_FULL)
612 {
613 anomaly[i][LDT_POS] = false;
614 }
615
616 if (LDT_FULL && (CHI[i] > CHI_LIMIT[i]))
617 {
618 anomaly[i][LDT_POS] = true; /* Remember the last anomaly value */
619
620 Log(LOG_LEVEL_VERBOSE, "LDT(%d) in %s chi = %.2f thresh %.2f ", LDT_POS, name, CHI[i], CHI_LIMIT[i]);
621
622 /* Last printed element is now */
623
624 for (j = LDT_POS + 1, k = 0; k < LDT_BUFSIZE; j++, k++)
625 {
626 if (j == LDT_BUFSIZE) /* Wrap */
627 {
628 j = 0;
629 }
630
631 if (anomaly[i][j])
632 {
633 snprintf(buff, CF_BUFSIZE, " *%.2f*", LDT_BUF[i][j]);
634 }
635 else
636 {
637 snprintf(buff, CF_BUFSIZE, " %.2f", LDT_BUF[i][j]);
638 }
639
640 strcat(ldt_buff, buff);
641 }
642
643 if (CF_THIS[i] > av->Q[i].expect)
644 {
645 snprintf(buff, CF_BUFSIZE, "%s_high_ldt", name);
646 }
647 else
648 {
649 snprintf(buff, CF_BUFSIZE, "%s_high_ldt", name);
650 }
651
652 AppendItem(&mon_data, buff, "2");
653 EvalContextHeapPersistentSave(ctx, buff, CF_PERSISTENCE, CONTEXT_STATE_POLICY_PRESERVE, "");
654 EvalContextClassPutSoft(ctx, buff, CONTEXT_SCOPE_NAMESPACE, "");
655 }
656 else
657 {
658 for (j = LDT_POS + 1, k = 0; k < LDT_BUFSIZE; j++, k++)
659 {
660 if (j == LDT_BUFSIZE) /* Wrap */
661 {
662 j = 0;
663 }
664
665 if (anomaly[i][j])
666 {
667 snprintf(buff, CF_BUFSIZE, " *%.2f*", LDT_BUF[i][j]);
668 }
669 else
670 {
671 snprintf(buff, CF_BUFSIZE, " %.2f", LDT_BUF[i][j]);
672 }
673 strcat(ldt_buff, buff);
674 }
675 }
676 }
677
678 SetMeasurementPromises(&mon_data);
679
680 // Report on the open ports, in various ways
681
682 AddOpenPorts("listening_ports", ALL_INCOMING, &mon_data);
683 AddOpenPorts("listening_udp6_ports", MON_UDP6, &mon_data);
684 AddOpenPorts("listening_udp4_ports", MON_UDP4, &mon_data);
685 AddOpenPorts("listening_tcp6_ports", MON_TCP6, &mon_data);
686 AddOpenPorts("listening_tcp4_ports", MON_TCP4, &mon_data);
687
688 // Port addresses
689
690 if (ListLen(MON_TCP6) + ListLen(MON_TCP4) > 512)
691 {
692 Log(LOG_LEVEL_INFO, "Disabling address information of TCP ports in LISTEN state: more than 512 listening ports are detected");
693 }
694 else
695 {
696 for (ip = MON_TCP6; ip != NULL; ip=ip->next)
697 {
698 snprintf(buff,CF_BUFSIZE,"tcp6_port_addr[%s]=%s",ip->name,ip->classes);
699 AppendItem(&mon_data, buff, NULL);
700 }
701
702 for (ip = MON_TCP4; ip != NULL; ip=ip->next)
703 {
704 snprintf(buff,CF_BUFSIZE,"tcp4_port_addr[%s]=%s",ip->name,ip->classes);
705 AppendItem(&mon_data, buff, NULL);
706 }
707 }
708
709 for (ip = MON_UDP6; ip != NULL; ip=ip->next)
710 {
711 snprintf(buff,CF_BUFSIZE,"udp6_port_addr[%s]=%s",ip->name,ip->classes);
712 AppendItem(&mon_data, buff, NULL);
713 }
714
715 for (ip = MON_UDP4; ip != NULL; ip=ip->next)
716 {
717 snprintf(buff,CF_BUFSIZE,"udp4_port_addr[%s]=%s",ip->name,ip->classes);
718 AppendItem(&mon_data, buff, NULL);
719 }
720
721 PublishEnvironment(mon_data);
722
723 DeleteItemList(mon_data);
724 }
725
726 /*****************************************************************************/
727
GetCurrentAverages(char * timekey)728 static Averages *GetCurrentAverages(char *timekey)
729 {
730 CF_DB *dbp;
731 static Averages entry; /* No need to initialize */
732
733 if (!OpenDB(&dbp, dbid_observations))
734 {
735 return NULL;
736 }
737
738 memset(&entry, 0, sizeof(entry));
739
740 AGE++;
741 WAGE = AGE / SECONDS_PER_WEEK * CF_MEASURE_INTERVAL;
742
743 if (ReadDB(dbp, timekey, &entry, sizeof(Averages)))
744 {
745 int i;
746
747 for (i = 0; i < CF_OBSERVABLES; i++)
748 {
749 Log(LOG_LEVEL_DEBUG, "Previous values (%lf,..) for time index '%s'", entry.Q[i].expect, timekey);
750 }
751 }
752 else
753 {
754 Log(LOG_LEVEL_DEBUG, "No previous value for time index '%s'", timekey);
755 }
756
757 CloseDB(dbp);
758 return &entry;
759 }
760
761 /*****************************************************************************/
762
UpdateAverages(EvalContext * ctx,char * timekey,const Averages * const newvals)763 static void UpdateAverages(EvalContext *ctx, char *timekey, const Averages *const newvals)
764 {
765 assert(newvals != NULL);
766 CF_DB *dbp;
767
768 if (!OpenDB(&dbp, dbid_observations))
769 {
770 return;
771 }
772
773 Log(LOG_LEVEL_INFO, "Updated averages at '%s'", timekey);
774
775 WriteDB(dbp, timekey, newvals, sizeof(Averages));
776 WriteDB(dbp, "DATABASE_AGE", &AGE, sizeof(double));
777
778 CloseDB(dbp);
779 HistoryUpdate(ctx, newvals);
780 }
781
Day2Number(const char * datestring)782 static int Day2Number(const char *datestring)
783 {
784 int i = 0;
785
786 for (i = 0; i < 7; i++)
787 {
788 if (strncmp(datestring, DAY_TEXT[i], 3) == 0)
789 {
790 return i;
791 }
792 }
793
794 return -1;
795 }
796
UpdateDistributions(EvalContext * ctx,char * timekey,Averages * av)797 static void UpdateDistributions(EvalContext *ctx, char *timekey, Averages *av)
798 {
799 int position, day, i;
800 char filename[CF_BUFSIZE];
801
802 /* Take an interval of 4 standard deviations from -2 to +2, divided into CF_GRAINS
803 parts. Centre each measurement on CF_GRAINS/2 and scale each measurement by the
804 std-deviation for the current time.
805 */
806
807 if (IsDefinedClass(ctx, "Min40_45"))
808 {
809 day = Day2Number(timekey);
810
811 for (i = 0; i < CF_OBSERVABLES; i++)
812 {
813 position =
814 CF_GRAINS / 2 + (int) (0.5 + (CF_THIS[i] - av->Q[i].expect) * CF_GRAINS / (4 * sqrt((av->Q[i].var))));
815
816 if ((0 <= position) && (position < CF_GRAINS))
817 {
818 HISTOGRAM[i][day][position]++;
819 }
820 }
821
822 snprintf(filename, CF_BUFSIZE, "%s%chistograms", GetStateDir(), FILE_SEPARATOR);
823
824 FILE *fp = safe_fopen(filename, "w");
825 if (fp == NULL)
826 {
827 Log(LOG_LEVEL_ERR, "Unable to save histograms to '%s' (fopen: %s)", filename, GetErrorStr());
828 return;
829 }
830
831 for (position = 0; position < CF_GRAINS; position++)
832 {
833 fprintf(fp, "%d ", position);
834
835 for (i = 0; i < CF_OBSERVABLES; i++)
836 {
837 for (day = 0; day < 7; day++)
838 {
839 fprintf(fp, "%.0lf ", HISTOGRAM[i][day][position]);
840 }
841 }
842 fprintf(fp, "\n");
843 }
844
845 fclose(fp);
846 }
847 }
848
849 /*****************************************************************************/
850
851 /*
852 This function performs a weighted average of an old and a new measured
853 value. Weights depend on the age of the data. If one or both values
854 are "unreasonably" large (>9999999) they will be ignored.
855 */
856 /* For a couple of weeks, learn eagerly. Otherwise variances will
857 be way too large. Then downplay newer data somewhat, and rely on
858 experience of a couple of months of data ... */
859
WAverage(double new_val,double old_val,double age)860 static double WAverage(double new_val, double old_val, double age)
861 {
862 const double cf_sane_monitor_limit = 9999999.0;
863 double average, weight_new, weight_old;
864
865 // First do some database corruption self-healing
866 const bool old_bad = (old_val > cf_sane_monitor_limit);
867 const bool new_bad = (new_val > cf_sane_monitor_limit);
868 if (old_bad && new_bad)
869 {
870 return 0.0;
871 }
872 else if (old_bad)
873 {
874 return new_val;
875 }
876 else if (new_bad)
877 {
878 return old_val;
879 }
880
881 // Now look at the self-learning
882 if ((FORGETRATE > 0.9) || (FORGETRATE < 0.1))
883 {
884 FORGETRATE = 0.6;
885 }
886
887 // More aggressive learning for young database
888 if (age < 2.0)
889 {
890 weight_new = FORGETRATE;
891 weight_old = (1.0 - FORGETRATE);
892 }
893 else
894 {
895 weight_new = (1.0 - FORGETRATE);
896 weight_old = FORGETRATE;
897 }
898
899 if ((old_val == 0) && (new_val == 0))
900 {
901 return 0.0;
902 }
903
904 /*
905 * Average = (w1*v1 + w2*v2) / (w1 + w2)
906 *
907 * w1 + w2 always equals to 1, so we omit it for better precision and
908 * performance.
909 */
910
911 average = (weight_new * new_val + weight_old * old_val);
912
913 if (average < 0)
914 {
915 /* Accuracy lost - something wrong */
916 return 0.0;
917 }
918
919 return average;
920 }
921
922 /*****************************************************************************/
923
SetClasses(EvalContext * ctx,char * name,double variable,double av_expect,double av_var,double localav_expect,double localav_var,Item ** classlist)924 static double SetClasses(EvalContext *ctx, char *name, double variable, double av_expect, double av_var, double localav_expect,
925 double localav_var, Item **classlist)
926 {
927 char buffer[CF_BUFSIZE], buffer2[CF_BUFSIZE];
928 double dev, delta, sigma, ldelta, lsigma, sig;
929
930 delta = variable - av_expect;
931 sigma = sqrt(av_var);
932 ldelta = variable - localav_expect;
933 lsigma = sqrt(localav_var);
934 sig = sqrt(sigma * sigma + lsigma * lsigma);
935
936 Log(LOG_LEVEL_DEBUG, "delta = %lf, sigma = %lf, lsigma = %lf, sig = %lf", delta, sigma, lsigma, sig);
937
938 if ((sigma == 0.0) || (lsigma == 0.0))
939 {
940 Log(LOG_LEVEL_DEBUG, "No sigma variation .. can't measure class");
941
942 snprintf(buffer, CF_MAXVARSIZE, "entropy_%s.*", name);
943 MonEntropyPurgeUnused(buffer);
944
945 return sig;
946 }
947
948 Log(LOG_LEVEL_DEBUG, "Setting classes for '%s'...", name);
949
950 if (fabs(delta) < cf_noise_threshold) /* Arbitrary limits on sensitivity */
951 {
952 Log(LOG_LEVEL_DEBUG, "Sensitivity too high");
953
954 buffer[0] = '\0';
955 strcpy(buffer, name);
956
957 if ((delta > 0) && (ldelta > 0))
958 {
959 strcat(buffer, "_high");
960 }
961 else if ((delta < 0) && (ldelta < 0))
962 {
963 strcat(buffer, "_low");
964 }
965 else
966 {
967 strcat(buffer, "_normal");
968 }
969
970 AppendItem(classlist, buffer, "0");
971
972 dev = sqrt(delta * delta / (1.0 + sigma * sigma) + ldelta * ldelta / (1.0 + lsigma * lsigma));
973
974 if (dev > 2.0 * sqrt(2.0))
975 {
976 strcpy(buffer2, buffer);
977 strcat(buffer2, "_microanomaly");
978 AppendItem(classlist, buffer2, "2");
979 EvalContextHeapPersistentSave(ctx, buffer2, CF_PERSISTENCE, CONTEXT_STATE_POLICY_PRESERVE, "");
980 EvalContextClassPutSoft(ctx, buffer2, CONTEXT_SCOPE_NAMESPACE, "");
981 }
982
983 return sig; /* Granularity makes this silly */
984 }
985 else
986 {
987 buffer[0] = '\0';
988 strcpy(buffer, name);
989
990 if ((delta > 0) && (ldelta > 0))
991 {
992 strcat(buffer, "_high");
993 }
994 else if ((delta < 0) && (ldelta < 0))
995 {
996 strcat(buffer, "_low");
997 }
998 else
999 {
1000 strcat(buffer, "_normal");
1001 }
1002
1003 dev = sqrt(delta * delta / (1.0 + sigma * sigma) + ldelta * ldelta / (1.0 + lsigma * lsigma));
1004
1005 if (dev <= sqrt(2.0))
1006 {
1007 strcpy(buffer2, buffer);
1008 strcat(buffer2, "_normal");
1009 AppendItem(classlist, buffer2, "0");
1010 }
1011 else
1012 {
1013 strcpy(buffer2, buffer);
1014 strcat(buffer2, "_dev1");
1015 AppendItem(classlist, buffer2, "0");
1016 }
1017
1018 /* Now use persistent classes so that serious anomalies last for about
1019 2 autocorrelation lengths, so that they can be cross correlated and
1020 seen by normally scheduled cfagent processes ... */
1021
1022 if (dev > 2.0 * sqrt(2.0))
1023 {
1024 strcpy(buffer2, buffer);
1025 strcat(buffer2, "_dev2");
1026 AppendItem(classlist, buffer2, "2");
1027 EvalContextHeapPersistentSave(ctx, buffer2, CF_PERSISTENCE, CONTEXT_STATE_POLICY_PRESERVE, "");
1028 EvalContextClassPutSoft(ctx, buffer2, CONTEXT_SCOPE_NAMESPACE, "");
1029 }
1030
1031 if (dev > 3.0 * sqrt(2.0))
1032 {
1033 strcpy(buffer2, buffer);
1034 strcat(buffer2, "_anomaly");
1035 AppendItem(classlist, buffer2, "3");
1036 EvalContextHeapPersistentSave(ctx, buffer2, CF_PERSISTENCE, CONTEXT_STATE_POLICY_PRESERVE, "");
1037 EvalContextClassPutSoft(ctx, buffer2, CONTEXT_SCOPE_NAMESPACE, "");
1038 }
1039
1040 return sig;
1041 }
1042 }
1043
1044 /*****************************************************************************/
1045
SetVariable(char * name,double value,double average,double stddev,Item ** classlist)1046 static void SetVariable(char *name, double value, double average, double stddev, Item **classlist)
1047 {
1048 char var[CF_BUFSIZE];
1049
1050 snprintf(var, CF_MAXVARSIZE, "value_%s=%.2lf", name, value);
1051 AppendItem(classlist, var, "");
1052
1053 snprintf(var, CF_MAXVARSIZE, "av_%s=%.2lf", name, average);
1054 AppendItem(classlist, var, "");
1055
1056 snprintf(var, CF_MAXVARSIZE, "dev_%s=%.2lf", name, stddev);
1057 AppendItem(classlist, var, "");
1058 }
1059
1060 /*****************************************************************************/
1061
ZeroArrivals()1062 static void ZeroArrivals()
1063 {
1064 memset(CF_THIS, 0, sizeof(CF_THIS));
1065 }
1066
1067 /*****************************************************************************/
1068
RejectAnomaly(double new,double average,double variance,double localav,double localvar)1069 static double RejectAnomaly(double new, double average, double variance, double localav, double localvar)
1070 {
1071 double dev = sqrt(variance + localvar); /* Geometrical average dev */
1072 double delta;
1073 int bigger;
1074
1075 if (average == 0)
1076 {
1077 return new;
1078 }
1079
1080 if (new > MON_THRESHOLD_HIGH * 4.0)
1081 {
1082 return 0.0;
1083 }
1084
1085 if (new > MON_THRESHOLD_HIGH)
1086 {
1087 return average;
1088 }
1089
1090 if ((new - average) * (new - average) < cf_noise_threshold * cf_noise_threshold)
1091 {
1092 return new;
1093 }
1094
1095 if (new - average > 0)
1096 {
1097 bigger = true;
1098 }
1099 else
1100 {
1101 bigger = false;
1102 }
1103
1104 /* This routine puts some inertia into the changes, so that the system
1105 doesn't respond to every little change ... IR and UV cutoff */
1106
1107 delta = sqrt((new - average) * (new - average) + (new - localav) * (new - localav));
1108
1109 if (delta > 4.0 * dev) /* IR */
1110 {
1111 srand48((unsigned int) time(NULL));
1112
1113 if (drand48() < 0.7) /* 70% chance of using full value - as in learning policy */
1114 {
1115 return new;
1116 }
1117 else
1118 {
1119 if (bigger)
1120 {
1121 return average + 2.0 * dev;
1122 }
1123 else
1124 {
1125 return average - 2.0 * dev;
1126 }
1127 }
1128 }
1129 else
1130 {
1131 Log(LOG_LEVEL_VERBOSE, "Value accepted");
1132 return new;
1133 }
1134 }
1135
1136 /***************************************************************/
1137 /* Level 5 */
1138 /***************************************************************/
1139
GatherPromisedMeasures(EvalContext * ctx,const Policy * policy)1140 static void GatherPromisedMeasures(EvalContext *ctx, const Policy *policy)
1141 {
1142 for (size_t i = 0; i < SeqLength(policy->bundles); i++)
1143 {
1144 const Bundle *bp = SeqAt(policy->bundles, i);
1145 EvalContextStackPushBundleFrame(ctx, bp, NULL, false);
1146
1147 if ((strcmp(bp->type, CF_AGENTTYPES[AGENT_TYPE_MONITOR]) == 0) || (strcmp(bp->type, CF_AGENTTYPES[AGENT_TYPE_COMMON]) == 0))
1148 {
1149 for (size_t j = 0; j < SeqLength(bp->sections); j++)
1150 {
1151 BundleSection *sp = SeqAt(bp->sections, j);
1152
1153 EvalContextStackPushBundleSectionFrame(ctx, sp);
1154 for (size_t ppi = 0; ppi < SeqLength(sp->promises); ppi++)
1155 {
1156 Promise *pp = SeqAt(sp->promises, ppi);
1157 ExpandPromise(ctx, pp, KeepMonitorPromise, NULL);
1158 }
1159 EvalContextStackPopFrame(ctx);
1160 }
1161 }
1162
1163 EvalContextStackPopFrame(ctx);
1164 }
1165
1166 EvalContextClear(ctx);
1167 DetectEnvironment(ctx);
1168 }
1169
1170 /*********************************************************************/
1171 /* Level */
1172 /*********************************************************************/
1173
KeepMonitorPromise(EvalContext * ctx,const Promise * pp,ARG_UNUSED void * param)1174 static PromiseResult KeepMonitorPromise(EvalContext *ctx, const Promise *pp, ARG_UNUSED void *param)
1175 {
1176 assert(param == NULL);
1177
1178 if (strcmp("vars", PromiseGetPromiseType(pp)) == 0)
1179 {
1180 return PROMISE_RESULT_NOOP;
1181 }
1182 else if (strcmp("classes", PromiseGetPromiseType(pp)) == 0)
1183 {
1184 return VerifyClassPromise(ctx, pp, NULL);
1185 }
1186 else if (strcmp("measurements", PromiseGetPromiseType(pp)) == 0)
1187 {
1188 PromiseResult result = VerifyMeasurementPromise(ctx, CF_THIS, pp);
1189 return result;
1190 }
1191 else if (strcmp("reports", PromiseGetPromiseType(pp)) == 0)
1192 {
1193 return PROMISE_RESULT_NOOP;
1194 }
1195
1196 assert(false && "Unknown promise type");
1197 return PROMISE_RESULT_NOOP;
1198 }
1199