1 #include "jbase.h"
2 #include "jdevice.h"
3 #include "jprocessor.h"
4 #include "jresolver.h"
5 #include "jconfig.h"
6 #include "jutil.h"
7 #include "juiadisplay.h"
8
9 #define ENABLE_JNET
10 #ifdef ENABLE_JNET
11
12 // protocol
13 // HELLO <protocol-version>
14 // HELLO:OK <protocol-version>\t<features>
15 // SETFILTER "<filter>"
16 // SETFILTER:OK
17 // INTERFACE "<interface>"
18 // INTERFACE:OK
19 // RUN
20 // RUN:OK
21 // SOOB:S <statistics>
22 // SOOB:N <streamid>,...streamdata...
23 // SOOB:U <streamid>,...streamupdate...
24 // SOOB:D <streamid>
25 // SOOB:L <address>\t<name>
26 // SOOB:E <msg>
27 // STOP
28 // STOP:OK
29 // EXIT
30 // EXIT:OK
31
32 GScanner * inputScanner;
33 GScannerConfig scannerConfig = {
34 /* cset_skip_characters */ " \t",
35 /* cset_identifier_first */ G_CSET_A_2_Z,
36 /* cset_identifier_nth */ G_CSET_A_2_Z,
37 /* cpair_comment_single */ "",
38 /* case_sensitive */ TRUE,
39 /* skip_comment_multi */ FALSE,
40 /* skip_comment_single */ FALSE,
41 /* scan_comment_multi */ FALSE,
42 /* scan_identifier */ TRUE,
43 /* scan_identifier_1char */ TRUE,
44 /* scan_identifier_NULL */ FALSE,
45 /* scan_symbols */ TRUE,
46 /* scan_binary */ FALSE,
47 /* scan_octal */ FALSE,
48 /* scan_float */ FALSE,
49 /* scan_hex */ FALSE,
50 /* scan_hex_dollar */ FALSE,
51 /* scan_string_sq */ FALSE,
52 /* scan_string_dq */ TRUE,
53 /* numbers_2_int */ TRUE,
54 /* int_2_float */ FALSE,
55 /* identifier_2_string */ TRUE,
56 /* char_2_token */ TRUE,
57 /* symbol_2_token */ FALSE,
58 /* scope_0_fallback */ FALSE,
59 /* store_int64 */ FALSE,
60 /* padding_dummy */ 0 };
61
62 #define INPUT_STATE_INITIAL 0
63 #define INPUT_STATE_COMMAND 1
64 #define INPUT_STATE_RUNNING 2
65
66 #define RUNNING_STATE_DO_NOTHING 0
67 #define RUNNING_STATE_RUNNING 1
68
69 int inputState = INPUT_STATE_INITIAL;
70 volatile int runningState = RUNNING_STATE_DO_NOTHING;
71 GMutex *runningMutex;
72 GMutex *outputMutex;
73 GString *stringBuffer;
74 char *soobuMessageFormat, *soobnMessageFormat;
75
76 #define RECV_BUFFER_SIZE 1024
77 #define MAX_LINE_SIZE 1024
78 #define MAX_INTERFACENAME_LENGTH 64
readNextCommandLine()79 static gchar* readNextCommandLine() {
80 fd_set listenSet, exceptionSet;
81 struct timeval timeout;
82 int selectResult;
83 GString *lineBuffer;
84 char *eoln = NULL;
85
86 lineBuffer = g_string_sized_new(1024);
87
88 do {
89 int l;
90 char buffer[RECV_BUFFER_SIZE];
91
92 FD_ZERO(&listenSet);
93 FD_SET(fileno(stdin), &listenSet);
94 FD_ZERO(&exceptionSet);
95 FD_SET(fileno(stdin), &exceptionSet);
96 timeout.tv_sec = 10; // COMMAND TIMEOUT
97 timeout.tv_usec = 0;
98 selectResult = select(fileno(stdin)+1, &listenSet, NULL, &exceptionSet, &timeout);
99
100 if (selectResult == -1) {
101 g_string_free(lineBuffer, TRUE);
102 return NULL;
103 }
104
105 if (FD_ISSET(fileno(stdin), &exceptionSet)) {
106 g_string_free(lineBuffer, TRUE);
107 return NULL;
108 }
109
110 if (!FD_ISSET(fileno(stdin), &listenSet)) {
111 continue;
112 }
113
114 l = read(fileno(stdin), buffer, RECV_BUFFER_SIZE);
115 if (l <= 0) {
116 g_string_free(lineBuffer, TRUE);
117 return NULL;
118 }
119
120 if (lineBuffer->len + l > MAX_LINE_SIZE) {
121 g_string_free(lineBuffer, TRUE);
122 return NULL;
123 }
124
125 g_string_append_len(lineBuffer, buffer, l);
126 g_strdelimit(lineBuffer->str, "\r\n", '\n');
127 eoln = strchr(lineBuffer->str, '\n');
128 } while (eoln == NULL);
129
130 *eoln = '\0';
131 return g_string_free(lineBuffer, FALSE);
132 }
133
sendLine(const gchar * string)134 static void sendLine(const gchar *string) {
135 g_mutex_lock(outputMutex);
136 fprintf(stdout, "%s", string);
137 fflush(stdout);
138 g_mutex_unlock(outputMutex);
139 }
140
sendLinef(const gchar * formatString,...)141 static void sendLinef(const gchar *formatString, ...) {
142 va_list ap;
143 va_start(ap, formatString);
144 g_mutex_lock(outputMutex);
145 vfprintf(stdout, formatString, ap);
146 fflush(stdout);
147 g_mutex_unlock(outputMutex);
148 va_end(ap);
149 }
150
parseNextToken(GTokenType expectedTokenType,const char * errorMessage)151 static gboolean parseNextToken(GTokenType expectedTokenType, const char *errorMessage) {
152 GTokenType tt;
153 tt = g_scanner_get_next_token(inputScanner);
154 if (tt != expectedTokenType) {
155 sendLine(errorMessage);
156 return FALSE;
157 }
158 return TRUE;
159 }
160
161 #define parseNextString(errorMessage) parseNextToken(G_TOKEN_STRING, errorMessage)
162 #define parseNextInt(errorMessage) parseNextToken(G_TOKEN_INT, errorMessage)
163
processNextCommand()164 static gboolean processNextCommand() {
165 gchar *commandLine;
166 gboolean stayConnected = TRUE;
167 static char interfaceName[MAX_INTERFACENAME_LENGTH];
168
169 commandLine = readNextCommandLine();
170 if (commandLine == NULL)
171 return FALSE;
172
173 g_scanner_input_text(inputScanner, commandLine, strlen(commandLine));
174 if (!parseNextString("?:ERR Command expected.\n")) {
175 goto line_processed;
176 }
177
178 if (inputState == INPUT_STATE_INITIAL && !strcmp(inputScanner->value.v_string, "HELLO")) {
179 if (!parseNextInt("HELLO:ERR Version argument expected.\n")) {
180 goto line_processed;
181 }
182 if (inputScanner->value.v_int < 1) {
183 sendLine("HELLO:ERR Unsupported version.\n");
184 goto line_processed;
185 }
186 sendLine("HELLO:OK 1\n");
187 inputState = INPUT_STATE_COMMAND;
188 goto line_processed;
189 }
190
191 if (inputState == INPUT_STATE_COMMAND && !strcmp(inputScanner->value.v_string, "SETFILTER")) {
192 const char * filterValidationError;
193
194 if (!parseNextString("SETFILTER:ERR Filter expected.\n")) {
195 goto line_processed;
196 }
197 JCONFIG_BPFFILTERS_SETNONE;
198
199 filterValidationError = jutil_ValidateBPFFilter(inputScanner->value.v_string);
200 if (filterValidationError) {
201 sendLinef("SETFILTER:ERR Error parsing filter rule: %s.\n", filterValidationError);
202 goto line_processed;
203 }
204
205 JCONFIG_BPFFILTERS_SETSELECTEDFILTER(JCONFIG_BPFFILTERS_LEN);
206 jconfig_AddBpfFilter("<fromsetfilter>", g_strdup(inputScanner->value.v_string));
207 sendLine("SETFILTER:OK Filter set.\n");
208 goto line_processed;
209 }
210
211 if (inputState == INPUT_STATE_COMMAND && !strcmp(inputScanner->value.v_string, "INTERFACE")) {
212 if (!parseNextString("INTERFACE:ERR Device name expected.\n")) {
213 goto line_processed;
214 }
215
216 if (strlen(inputScanner->value.v_string) > MAX_INTERFACENAME_LENGTH-1) {
217 sendLine("INTERFACE:ERR Interface name too long.\n");
218 goto line_processed;
219 }
220
221 strcpy(interfaceName, inputScanner->value.v_string);
222 jconfig_SelectDevice(interfaceName);
223 sendLine("INTERFACE:OK Interface set.\n");
224 goto line_processed;
225 }
226
227 if (inputState == INPUT_STATE_COMMAND && !strcmp(inputScanner->value.v_string, "INTERFACES")) {
228 int i;
229 char buffer[256];
230
231 sendLine("INTERFACES:OK Interface list follows.\n");
232 for (i=0; i<jdevice_DevicesCount; i++) {
233 jbase_device *device = jdevice_Devices+i;
234 jutil_StorageAddress2String(&device->hwaddr, buffer, sizeof(buffer)-1);
235 sendLinef("INTERFACES:INFO %s\t%d\t%s\n", device->name, ((const struct sockaddr *)&device->hwaddr)->sa_family, buffer);
236 }
237 sendLine("INTERFACES:END End of list.\n");
238 goto line_processed;
239 }
240
241 if (inputState == INPUT_STATE_COMMAND && !strcmp(inputScanner->value.v_string, "RUN")) {
242 inputState = INPUT_STATE_RUNNING;
243 goto line_processed;
244 }
245
246 if (inputState == INPUT_STATE_RUNNING && !strcmp(inputScanner->value.v_string, "STOP")) {
247 inputState = INPUT_STATE_COMMAND;
248 goto line_processed;
249 }
250
251 if (!strcmp(inputScanner->value.v_string, "EXIT")) {
252 sendLine("EXIT:OK Good Bye.\n");
253 stayConnected = FALSE;
254 goto line_processed;
255 }
256
257 sendLinef("%s:ERR Unknown command.\n", inputScanner->value.v_string);
258
259 line_processed:
260 g_free(commandLine);
261 return stayConnected;
262 }
263
sendDeleteStream(GString * buffer,jbase_stream * s)264 static void sendDeleteStream(GString *buffer, jbase_stream *s) {
265 g_string_append_printf(buffer, "SOOB:D %08x%08x\n", (unsigned int)(s->uid>>32), (unsigned int)(s->uid&0xffffffff));
266 }
267
268 #define SOOBU_MESSAGEFORMAT "SOOB:U $uid$\t$srcbytes$\t$dstbytes$\t$totalbytes$\t$srcpackets$\t$dstpackets$\t$totalpackets$\t$srcbps$\t$dstbps$\t$totalbps$\t$srcpps$\t$dstpps$\t$totalpps$\t$filterdataifchanged$"
269 #define SOOBN_MESSAGEFORMAT "SOOB:N $uid$\t$src$\t$dst$\t$proto$\t$srcport$\t$dstport$"
270
sendStatistics(GString * buffer)271 static void sendStatistics(GString *buffer) {
272 g_string_append_printf(buffer, "SOOB:S %u\t%u\t%u\t%u\t%u\t%u\t%u\t%u\t%u\t%u\t%u\t%u\n",
273 jprocessor_Stats.totalSrcBytes,
274 jprocessor_Stats.totalDstBytes,
275 jprocessor_Stats.totalBytes,
276 jprocessor_Stats.totalSrcPackets,
277 jprocessor_Stats.totalDstPackets,
278 jprocessor_Stats.totalPackets,
279 jprocessor_Stats.totalSrcBPS,
280 jprocessor_Stats.totalDstBPS,
281 jprocessor_Stats.totalBPS,
282 jprocessor_Stats.totalSrcPPS,
283 jprocessor_Stats.totalDstPPS,
284 jprocessor_Stats.totalPPS);
285 }
286
sendUpdateStream(GString * buffer,jbase_stream * s)287 static void sendUpdateStream(GString *buffer, jbase_stream *s) {
288 jutil_InterpretStreamFormat(buffer, soobuMessageFormat, s);
289 g_string_append_c(buffer, '\n');
290 s->filterDataLastDisplayChangeCount = s->filterDataChangeCount;
291 }
292
sendNewStream(GString * buffer,jbase_stream * s)293 static void sendNewStream(GString *buffer, jbase_stream *s) {
294 jutil_InterpretStreamFormat(buffer, soobnMessageFormat, s);
295 g_string_append_c(buffer, '\n');
296 }
297
resolvedNotifyFunc(jbase_resolv_entry * entry)298 static void resolvedNotifyFunc(jbase_resolv_entry *entry) {
299 gchar addr[INET6_ADDRSTRLEN + 1];
300
301 g_mutex_lock(runningMutex);
302 if (runningState == RUNNING_STATE_RUNNING) {
303 jutil_Address2String(entry->af, &entry->addr, addr, INET6_ADDRSTRLEN);
304 sendLinef("SOOB:L %s\t%s\n", addr, entry->name);
305 }
306 g_mutex_unlock(runningMutex);
307 }
308
processStreamsFunc(GPtrArray * streamArray)309 static void processStreamsFunc(GPtrArray * streamArray) {
310 guint i;
311
312 g_mutex_lock(runningMutex);
313 if (runningState == RUNNING_STATE_RUNNING) {
314 GString *buffer = g_string_sized_new(streamArray->len * 100);
315 sendStatistics(buffer);
316 for (i=0; i<streamArray->len; i++) {
317 jbase_stream *s = (jbase_stream *)g_ptr_array_index(streamArray, i);
318 if (s->dead && !s->displayed) {
319 continue;
320 }
321 if (s->dead) {
322 sendDeleteStream(buffer, s);
323 s->displayed = 0;
324 continue;
325 }
326 if (!s->displayed) {
327 s->displayed = 1;
328 sendNewStream(buffer, s);
329 }
330 sendUpdateStream(buffer, s);
331 }
332 sendLinef("%s", buffer->str);
333 g_string_free(buffer, TRUE);
334 }
335 sendLinef("SOOB:E Update finished.\n");
336 g_mutex_unlock(runningMutex);
337 }
338
jnetdisplay_PreSetup()339 static gboolean jnetdisplay_PreSetup() {
340 return TRUE;
341 }
342
jnetdisplay_Setup()343 static void jnetdisplay_Setup() {
344 setvbuf(stdin, NULL, _IOLBF, 0);
345 setvbuf(stdout, NULL, _IOLBF, 0);
346
347 inputScanner = g_scanner_new(&scannerConfig);
348 outputMutex = g_mutex_new();
349 runningMutex = g_mutex_new();
350 stringBuffer = g_string_new("");
351
352 soobuMessageFormat = g_strdup(SOOBU_MESSAGEFORMAT);
353 soobnMessageFormat = g_strdup(SOOBN_MESSAGEFORMAT);
354
355 jprocessor_Sorting = FALSE;
356
357 jprocessor_SetProcessStreamsFunc((ProcessStreamsFunc) processStreamsFunc);
358 jresolver_SetResolvedNotifyFunc((ResolvedNotifyFunc) resolvedNotifyFunc);
359 }
360
jnetdisplay_PreRunSetup()361 static gboolean jnetdisplay_PreRunSetup() {
362 do {
363 if (!processNextCommand()) {
364 return FALSE;
365 }
366 if (inputState == INPUT_STATE_RUNNING) {
367 return TRUE;
368 }
369 } while (TRUE);
370 }
371
jnetdisplay_PreRun()372 static void jnetdisplay_PreRun() {
373 }
374
jnetdisplay_Run()375 static gboolean jnetdisplay_Run() {
376 sendLine("RUN:OK Running...\n");
377 g_mutex_lock(runningMutex);
378 runningState = RUNNING_STATE_RUNNING;
379 g_mutex_unlock(runningMutex);
380 while (inputState == INPUT_STATE_RUNNING && processNextCommand()) {
381 }
382 g_mutex_lock(runningMutex);
383 runningState = RUNNING_STATE_DO_NOTHING;
384 g_mutex_unlock(runningMutex);
385 if (inputState == INPUT_STATE_COMMAND) {
386 sendLine("STOP:OK Stopped.\n");
387 }
388 return inputState == INPUT_STATE_COMMAND;
389 }
390
jnetdisplay_Shutdown()391 static void jnetdisplay_Shutdown() {
392 }
393
jnetdisplay_DrawStatus(const gchar * msg)394 static void jnetdisplay_DrawStatus(const gchar *msg) {
395 sendLinef("STATUS:OOB %s\n", msg);
396 }
397
jnetdisplay_ProcessArgument(const gchar ** arg,int argc)398 static int jnetdisplay_ProcessArgument(const gchar **arg, int argc) {
399 return 0;
400 }
401
402 jbase_display jnetdisplay_Functions = {
403 TRUE,
404 jnetdisplay_PreSetup,
405 jnetdisplay_Setup,
406 jnetdisplay_PreRunSetup,
407 jnetdisplay_PreRun,
408 jnetdisplay_Run,
409 jnetdisplay_Shutdown,
410 jnetdisplay_DrawStatus,
411 jnetdisplay_ProcessArgument
412 };
413
414 #else
415
416 jbase_display jnetdisplay_Functions = { FALSE };
417
418 #endif
419