1 /*
2 ** Copyright (C) 2001-2020 by Carnegie Mellon University.
3 **
4 ** @OPENSOURCE_LICENSE_START@
5 ** See license information in ../../LICENSE.txt
6 ** @OPENSOURCE_LICENSE_END@
7 */
8 
9 /*
10 **  rwuniq.c
11 **
12 **  Implementation of the rwuniq application.
13 **
14 **  rwuniq reads SiLK flow records---from files listed on the command
15 **  line or from the standard input when no filenames are given---and
16 **  bins those flows by a key composed of user-selected fields of an
17 **  rwRec, or by fields generated from a plug-in.  For each bin, a
18 **  user-selected combination of bytes, packets, flows, earliest
19 **  start-time, latest end-time, distinct sIPs, and/or distinct dIPs
20 **  may be computed.
21 **
22 **  Once the input is read, the keys fields and computed values are
23 **  printed for each bin that meets the user-specified minimum and
24 **  maximum.
25 **
26 **  Normally, rwuniq uses the hashlib hash table to store the
27 **  key-volume pairs for each bin.  If this hash table runs out of
28 **  memory, the contents of the table are sorted and then saved to
29 **  disk in a temporary file.  More records are then read into a fresh
30 **  hash table.  The process repeats until all records are read or the
31 **  maximum number of temp files is reached.  The on-disk files are
32 **  then merged to produce the final output.
33 **
34 **  When the --presorted-input switch is given, rwuniq assumes rwsort
35 **  has been used to sort the data with the same --fields value that
36 **  rwuniq is using.  In this case, the hash table is not used.
37 **  Instead, rwuniq just watches for the key to change, and prints the
38 **  key-volume when it does.
39 **
40 **  For the --presorted-input case or when more than one distinct IP
41 **  count is requested for the unsorted case, an IPSet is used to keep
42 **  track of the IPs we have seen.  Since IPSets do not yet support
43 **  IPv6, this limits rwuniq's ability when IPv6 is active.  Also,
44 **  these IPSets can exhaust the ram, which would lead to an incorrect
45 **  count of IPs.  Could consider using a hashlib instead of an IPSet
46 **  for the values to get around the IPv6 issue.
47 **
48 */
49 
50 #include <silk/silk.h>
51 
52 RCSIDENT("$SiLK: rwuniq.c ef14e54179be 2020-04-14 21:57:45Z mthomas $");
53 
54 #include "rwstats.h"
55 
56 
57 /* EXPORTED VARIABLES */
58 
59 /* is this rwstats or rwuniq? */
60 const statsuniq_program_t this_program = STATSUNIQ_PROGRAM_UNIQ;
61 
62 
63 /* FUNCTION DEFINITIONS */
64 
65 /* dummy functions needed for linking with rwstatssetup.c */
66 int
protoStatsOptionsRegister(void)67 protoStatsOptionsRegister(
68     void)
69 {
70     skAbort();
71     return 0;                   /* NOTREACHED */
72 }
73 void
protoStatsOptionsUsage(FILE * fh)74 protoStatsOptionsUsage(
75     FILE               *fh)
76 {
77     SK_UNUSED_PARAM(fh);
78     skAbort();
79 }
80 int
legacyOptionsSetup(clientData cData)81 legacyOptionsSetup(
82     clientData          cData)
83 {
84     SK_UNUSED_PARAM(cData);
85     skAbort();
86     return 0;                   /* NOTREACHED */
87 }
88 void
legacyOptionsUsage(FILE * fh)89 legacyOptionsUsage(
90     FILE               *fh)
91 {
92     SK_UNUSED_PARAM(fh);
93     skAbort();
94 }
95 
96 
97 /*
98  *  writeColTitles();
99  *
100  *    Enable the pager, and print the column titles to the global
101  *    'output.fp'.
102  */
103 static void
writeColTitles(void)104 writeColTitles(
105     void)
106 {
107     setOutputHandle();
108     rwAsciiPrintTitles(ascii_str);
109 }
110 
111 
112 /*
113  *  uniqRandom();
114  *
115  *    Main control function that creates a hash table, processes the
116  *    input (files or stdin), and prints the results.
117  */
118 static void
uniqRandom(void)119 uniqRandom(
120     void)
121 {
122     sk_unique_iterator_t *iter;
123     uint8_t *outbuf[3];
124     skstream_t *stream;
125     rwRec rwrec;
126     int rv = 0;
127 
128     while (0 == (rv = appNextInput(&stream))) {
129         while (SKSTREAM_OK == (rv = readRecord(stream, &rwrec))) {
130             if (0 != skUniqueAddRecord(uniq, &rwrec)) {
131                 appExit(EXIT_FAILURE);
132             }
133         }
134         if (rv != SKSTREAM_ERR_EOF) {
135             skStreamPrintLastErr(stream, rv, &skAppPrintErr);
136             skStreamDestroy(&stream);
137             return;
138         }
139         skStreamDestroy(&stream);
140     }
141     if (rv == -1) {
142         /* error reading file */
143         appExit(EXIT_FAILURE);
144     }
145 
146     /* Write out the headings */
147     writeColTitles();
148 
149     skUniquePrepareForOutput(uniq);
150 
151     /* create the iterator */
152     rv = skUniqueIteratorCreate(uniq, &iter);
153     if (rv) {
154         skAppPrintErr("Unable to create iterator; err = %d", rv);
155         appExit(EXIT_FAILURE);
156     }
157 
158     if (app_flags.check_limits) {
159         while (skUniqueIteratorNext(iter, &outbuf[0], &outbuf[2], &outbuf[1])
160                == SK_ITERATOR_OK)
161         {
162             checkLimitsWriteRecord(outbuf);
163         }
164     } else {
165         while (skUniqueIteratorNext(iter, &outbuf[0], &outbuf[2], &outbuf[1])
166                == SK_ITERATOR_OK)
167         {
168             writeAsciiRecord(outbuf);
169         }
170     }
171 
172     skUniqueIteratorDestroy(&iter);
173 
174     return;
175 }
176 
177 
178 static int
presortedCheckLimitsCallback(const uint8_t * key,const uint8_t * distinct,const uint8_t * value,void UNUSED (* callback_data))179 presortedCheckLimitsCallback(
180     const uint8_t          *key,
181     const uint8_t          *distinct,
182     const uint8_t          *value,
183     void            UNUSED(*callback_data))
184 {
185     uint8_t *outbuf[3];
186 
187     outbuf[0] = (uint8_t*)key;
188     outbuf[1] = (uint8_t*)value;
189     outbuf[2] = (uint8_t*)distinct;
190 
191     checkLimitsWriteRecord(outbuf);
192     return 0;
193 }
194 
195 static int
presortedEntryCallback(const uint8_t * key,const uint8_t * distinct,const uint8_t * value,void UNUSED (* callback_data))196 presortedEntryCallback(
197     const uint8_t          *key,
198     const uint8_t          *distinct,
199     const uint8_t          *value,
200     void            UNUSED(*callback_data))
201 {
202     uint8_t *outbuf[3];
203 
204     outbuf[0] = (uint8_t*)key;
205     outbuf[1] = (uint8_t*)value;
206     outbuf[2] = (uint8_t*)distinct;
207 
208     writeAsciiRecord(outbuf);
209     return 0;
210 }
211 
212 
213 /*
214  *  uniqPresorted();
215  *
216  *    Main control function that reads presorted flow records from
217  *    files or stdin and prints the results.
218  */
219 static void
uniqPresorted(void)220 uniqPresorted(
221     void)
222 {
223     /* Write the headings */
224     writeColTitles();
225 
226     if (app_flags.check_limits) {
227         if (skPresortedUniqueProcess(
228                 ps_uniq, presortedCheckLimitsCallback, NULL))
229         {
230             skAppPrintErr("Unique processing failed");
231         }
232     } else {
233         if (skPresortedUniqueProcess(ps_uniq, presortedEntryCallback, NULL)) {
234             skAppPrintErr("Unique processing failed");
235         }
236     }
237 }
238 
239 
main(int argc,char ** argv)240 int main(int argc, char **argv)
241 {
242     /* Global setup */
243     appSetup(argc, argv);
244 
245     if (app_flags.presorted_input) {
246         uniqPresorted();
247     } else {
248         uniqRandom();
249     }
250 
251     /* Done, do cleanup */
252     appTeardown();
253 
254     return 0;
255 }
256 
257 
258 /*
259 ** Local Variables:
260 ** mode:c
261 ** indent-tabs-mode:nil
262 ** c-basic-offset:4
263 ** End:
264 */
265