1 /*
2 ** Copyright (C) 2001-2020 by Carnegie Mellon University.
3 **
4 ** @OPENSOURCE_LICENSE_START@
5 ** See license information in ../../LICENSE.txt
6 ** @OPENSOURCE_LICENSE_END@
7 */
8
9 /*
10 ** rwuniq.c
11 **
12 ** Implementation of the rwuniq application.
13 **
14 ** rwuniq reads SiLK flow records---from files listed on the command
15 ** line or from the standard input when no filenames are given---and
16 ** bins those flows by a key composed of user-selected fields of an
17 ** rwRec, or by fields generated from a plug-in. For each bin, a
18 ** user-selected combination of bytes, packets, flows, earliest
19 ** start-time, latest end-time, distinct sIPs, and/or distinct dIPs
20 ** may be computed.
21 **
22 ** Once the input is read, the keys fields and computed values are
23 ** printed for each bin that meets the user-specified minimum and
24 ** maximum.
25 **
26 ** Normally, rwuniq uses the hashlib hash table to store the
27 ** key-volume pairs for each bin. If this hash table runs out of
28 ** memory, the contents of the table are sorted and then saved to
29 ** disk in a temporary file. More records are then read into a fresh
30 ** hash table. The process repeats until all records are read or the
31 ** maximum number of temp files is reached. The on-disk files are
32 ** then merged to produce the final output.
33 **
34 ** When the --presorted-input switch is given, rwuniq assumes rwsort
35 ** has been used to sort the data with the same --fields value that
36 ** rwuniq is using. In this case, the hash table is not used.
37 ** Instead, rwuniq just watches for the key to change, and prints the
38 ** key-volume when it does.
39 **
40 ** For the --presorted-input case or when more than one distinct IP
41 ** count is requested for the unsorted case, an IPSet is used to keep
42 ** track of the IPs we have seen. Since IPSets do not yet support
43 ** IPv6, this limits rwuniq's ability when IPv6 is active. Also,
44 ** these IPSets can exhaust the ram, which would lead to an incorrect
45 ** count of IPs. Could consider using a hashlib instead of an IPSet
46 ** for the values to get around the IPv6 issue.
47 **
48 */
49
50 #include <silk/silk.h>
51
52 RCSIDENT("$SiLK: rwuniq.c ef14e54179be 2020-04-14 21:57:45Z mthomas $");
53
54 #include "rwstats.h"
55
56
57 /* EXPORTED VARIABLES */
58
59 /* is this rwstats or rwuniq? */
60 const statsuniq_program_t this_program = STATSUNIQ_PROGRAM_UNIQ;
61
62
63 /* FUNCTION DEFINITIONS */
64
65 /* dummy functions needed for linking with rwstatssetup.c */
66 int
protoStatsOptionsRegister(void)67 protoStatsOptionsRegister(
68 void)
69 {
70 skAbort();
71 return 0; /* NOTREACHED */
72 }
73 void
protoStatsOptionsUsage(FILE * fh)74 protoStatsOptionsUsage(
75 FILE *fh)
76 {
77 SK_UNUSED_PARAM(fh);
78 skAbort();
79 }
80 int
legacyOptionsSetup(clientData cData)81 legacyOptionsSetup(
82 clientData cData)
83 {
84 SK_UNUSED_PARAM(cData);
85 skAbort();
86 return 0; /* NOTREACHED */
87 }
88 void
legacyOptionsUsage(FILE * fh)89 legacyOptionsUsage(
90 FILE *fh)
91 {
92 SK_UNUSED_PARAM(fh);
93 skAbort();
94 }
95
96
97 /*
98 * writeColTitles();
99 *
100 * Enable the pager, and print the column titles to the global
101 * 'output.fp'.
102 */
103 static void
writeColTitles(void)104 writeColTitles(
105 void)
106 {
107 setOutputHandle();
108 rwAsciiPrintTitles(ascii_str);
109 }
110
111
112 /*
113 * uniqRandom();
114 *
115 * Main control function that creates a hash table, processes the
116 * input (files or stdin), and prints the results.
117 */
118 static void
uniqRandom(void)119 uniqRandom(
120 void)
121 {
122 sk_unique_iterator_t *iter;
123 uint8_t *outbuf[3];
124 skstream_t *stream;
125 rwRec rwrec;
126 int rv = 0;
127
128 while (0 == (rv = appNextInput(&stream))) {
129 while (SKSTREAM_OK == (rv = readRecord(stream, &rwrec))) {
130 if (0 != skUniqueAddRecord(uniq, &rwrec)) {
131 appExit(EXIT_FAILURE);
132 }
133 }
134 if (rv != SKSTREAM_ERR_EOF) {
135 skStreamPrintLastErr(stream, rv, &skAppPrintErr);
136 skStreamDestroy(&stream);
137 return;
138 }
139 skStreamDestroy(&stream);
140 }
141 if (rv == -1) {
142 /* error reading file */
143 appExit(EXIT_FAILURE);
144 }
145
146 /* Write out the headings */
147 writeColTitles();
148
149 skUniquePrepareForOutput(uniq);
150
151 /* create the iterator */
152 rv = skUniqueIteratorCreate(uniq, &iter);
153 if (rv) {
154 skAppPrintErr("Unable to create iterator; err = %d", rv);
155 appExit(EXIT_FAILURE);
156 }
157
158 if (app_flags.check_limits) {
159 while (skUniqueIteratorNext(iter, &outbuf[0], &outbuf[2], &outbuf[1])
160 == SK_ITERATOR_OK)
161 {
162 checkLimitsWriteRecord(outbuf);
163 }
164 } else {
165 while (skUniqueIteratorNext(iter, &outbuf[0], &outbuf[2], &outbuf[1])
166 == SK_ITERATOR_OK)
167 {
168 writeAsciiRecord(outbuf);
169 }
170 }
171
172 skUniqueIteratorDestroy(&iter);
173
174 return;
175 }
176
177
178 static int
presortedCheckLimitsCallback(const uint8_t * key,const uint8_t * distinct,const uint8_t * value,void UNUSED (* callback_data))179 presortedCheckLimitsCallback(
180 const uint8_t *key,
181 const uint8_t *distinct,
182 const uint8_t *value,
183 void UNUSED(*callback_data))
184 {
185 uint8_t *outbuf[3];
186
187 outbuf[0] = (uint8_t*)key;
188 outbuf[1] = (uint8_t*)value;
189 outbuf[2] = (uint8_t*)distinct;
190
191 checkLimitsWriteRecord(outbuf);
192 return 0;
193 }
194
195 static int
presortedEntryCallback(const uint8_t * key,const uint8_t * distinct,const uint8_t * value,void UNUSED (* callback_data))196 presortedEntryCallback(
197 const uint8_t *key,
198 const uint8_t *distinct,
199 const uint8_t *value,
200 void UNUSED(*callback_data))
201 {
202 uint8_t *outbuf[3];
203
204 outbuf[0] = (uint8_t*)key;
205 outbuf[1] = (uint8_t*)value;
206 outbuf[2] = (uint8_t*)distinct;
207
208 writeAsciiRecord(outbuf);
209 return 0;
210 }
211
212
213 /*
214 * uniqPresorted();
215 *
216 * Main control function that reads presorted flow records from
217 * files or stdin and prints the results.
218 */
219 static void
uniqPresorted(void)220 uniqPresorted(
221 void)
222 {
223 /* Write the headings */
224 writeColTitles();
225
226 if (app_flags.check_limits) {
227 if (skPresortedUniqueProcess(
228 ps_uniq, presortedCheckLimitsCallback, NULL))
229 {
230 skAppPrintErr("Unique processing failed");
231 }
232 } else {
233 if (skPresortedUniqueProcess(ps_uniq, presortedEntryCallback, NULL)) {
234 skAppPrintErr("Unique processing failed");
235 }
236 }
237 }
238
239
main(int argc,char ** argv)240 int main(int argc, char **argv)
241 {
242 /* Global setup */
243 appSetup(argc, argv);
244
245 if (app_flags.presorted_input) {
246 uniqPresorted();
247 } else {
248 uniqRandom();
249 }
250
251 /* Done, do cleanup */
252 appTeardown();
253
254 return 0;
255 }
256
257
258 /*
259 ** Local Variables:
260 ** mode:c
261 ** indent-tabs-mode:nil
262 ** c-basic-offset:4
263 ** End:
264 */
265