1 #include "jbase.h"
2 #include "jdevice.h"
3 #include "jprocessor.h"
4 #include "jresolver.h"
5 #include "jconfig.h"
6 #include "jutil.h"
7 #include "juiadisplay.h"
8 
9 #define ENABLE_JNET
10 #ifdef ENABLE_JNET
11 
12 // protocol
13 //   HELLO <protocol-version>
14 //   HELLO:OK <protocol-version>\t<features>
15 //   SETFILTER "<filter>"
16 //   SETFILTER:OK
17 //   INTERFACE "<interface>"
18 //   INTERFACE:OK
19 //   RUN
20 //   RUN:OK
21 //   SOOB:S <statistics>
22 //   SOOB:N <streamid>,...streamdata...
23 //   SOOB:U <streamid>,...streamupdate...
24 //   SOOB:D <streamid>
25 //   SOOB:L <address>\t<name>
26 //   SOOB:E <msg>
27 //   STOP
28 //   STOP:OK
29 //   EXIT
30 //   EXIT:OK
31 
32 GScanner	* inputScanner;
33 GScannerConfig	scannerConfig = {
34 	/* cset_skip_characters */ " \t",
35 	/* cset_identifier_first */ G_CSET_A_2_Z,
36 	/* cset_identifier_nth */ G_CSET_A_2_Z,
37 	/* cpair_comment_single */ "",
38 	/* case_sensitive */ TRUE,
39 	/* skip_comment_multi */ FALSE,
40 	/* skip_comment_single */ FALSE,
41 	/* scan_comment_multi */ FALSE,
42 	/* scan_identifier */ TRUE,
43 	/* scan_identifier_1char */ TRUE,
44 	/* scan_identifier_NULL */ FALSE,
45 	/* scan_symbols */ TRUE,
46 	/* scan_binary */ FALSE,
47 	/* scan_octal */ FALSE,
48 	/* scan_float */ FALSE,
49 	/* scan_hex */ FALSE,
50 	/* scan_hex_dollar */ FALSE,
51 	/* scan_string_sq */ FALSE,
52 	/* scan_string_dq */ TRUE,
53 	/* numbers_2_int */ TRUE,
54 	/* int_2_float */ FALSE,
55 	/* identifier_2_string */ TRUE,
56 	/* char_2_token */ TRUE,
57 	/* symbol_2_token */ FALSE,
58 	/* scope_0_fallback */ FALSE,
59 	/* store_int64 */ FALSE,
60 	/* padding_dummy */ 0 };
61 
62 #define		INPUT_STATE_INITIAL	0
63 #define		INPUT_STATE_COMMAND	1
64 #define		INPUT_STATE_RUNNING	2
65 
66 #define		RUNNING_STATE_DO_NOTHING	0
67 #define		RUNNING_STATE_RUNNING		1
68 
69 int		inputState = INPUT_STATE_INITIAL;
70 volatile int	runningState = RUNNING_STATE_DO_NOTHING;
71 GMutex		*runningMutex;
72 GMutex		*outputMutex;
73 GString		*stringBuffer;
74 char		*soobuMessageFormat, *soobnMessageFormat;
75 
76 #define RECV_BUFFER_SIZE		1024
77 #define MAX_LINE_SIZE			1024
78 #define MAX_INTERFACENAME_LENGTH	64
readNextCommandLine()79 static gchar* readNextCommandLine() {
80 	fd_set	listenSet, exceptionSet;
81 	struct	timeval timeout;
82 	int	selectResult;
83 	GString *lineBuffer;
84 	char	*eoln = NULL;
85 
86 	lineBuffer = g_string_sized_new(1024);
87 
88 	do {
89 		int l;
90 		char buffer[RECV_BUFFER_SIZE];
91 
92 		FD_ZERO(&listenSet);
93 		FD_SET(fileno(stdin), &listenSet);
94 		FD_ZERO(&exceptionSet);
95 		FD_SET(fileno(stdin), &exceptionSet);
96 		timeout.tv_sec = 10; // COMMAND TIMEOUT
97 		timeout.tv_usec = 0;
98 		selectResult = select(fileno(stdin)+1, &listenSet, NULL, &exceptionSet, &timeout);
99 
100 		if (selectResult == -1) {
101 			g_string_free(lineBuffer, TRUE);
102 			return NULL;
103 		}
104 
105 		if (FD_ISSET(fileno(stdin), &exceptionSet)) {
106 			g_string_free(lineBuffer, TRUE);
107 			return NULL;
108 		}
109 
110 		if (!FD_ISSET(fileno(stdin), &listenSet)) {
111 			continue;
112 		}
113 
114 		l = read(fileno(stdin), buffer, RECV_BUFFER_SIZE);
115 		if (l <= 0) {
116 			g_string_free(lineBuffer, TRUE);
117 			return NULL;
118 		}
119 
120 		if (lineBuffer->len + l > MAX_LINE_SIZE) {
121 			g_string_free(lineBuffer, TRUE);
122 			return NULL;
123 		}
124 
125 		g_string_append_len(lineBuffer, buffer, l);
126 		g_strdelimit(lineBuffer->str, "\r\n", '\n');
127 		eoln = strchr(lineBuffer->str, '\n');
128 	} while (eoln == NULL);
129 
130 	*eoln = '\0';
131 	return g_string_free(lineBuffer, FALSE);
132 }
133 
sendLine(const gchar * string)134 static void sendLine(const gchar *string) {
135 	g_mutex_lock(outputMutex);
136 	fprintf(stdout, "%s", string);
137 	fflush(stdout);
138 	g_mutex_unlock(outputMutex);
139 }
140 
sendLinef(const gchar * formatString,...)141 static void sendLinef(const gchar *formatString, ...) {
142 	va_list ap;
143 	va_start(ap, formatString);
144 	g_mutex_lock(outputMutex);
145 	vfprintf(stdout, formatString, ap);
146 	fflush(stdout);
147 	g_mutex_unlock(outputMutex);
148 	va_end(ap);
149 }
150 
parseNextToken(GTokenType expectedTokenType,const char * errorMessage)151 static gboolean parseNextToken(GTokenType expectedTokenType, const char *errorMessage) {
152 	GTokenType tt;
153 	tt = g_scanner_get_next_token(inputScanner);
154 	if (tt != expectedTokenType) {
155 		sendLine(errorMessage);
156 		return FALSE;
157 	}
158 	return TRUE;
159 }
160 
161 #define parseNextString(errorMessage) parseNextToken(G_TOKEN_STRING, errorMessage)
162 #define parseNextInt(errorMessage) parseNextToken(G_TOKEN_INT, errorMessage)
163 
processNextCommand()164 static gboolean processNextCommand() {
165 	gchar		*commandLine;
166 	gboolean	stayConnected = TRUE;
167 	static char	interfaceName[MAX_INTERFACENAME_LENGTH];
168 
169 	commandLine = readNextCommandLine();
170 	if (commandLine == NULL)
171 		return FALSE;
172 
173 	g_scanner_input_text(inputScanner, commandLine, strlen(commandLine));
174 	if (!parseNextString("?:ERR Command expected.\n")) {
175 		goto line_processed;
176 	}
177 
178 	if (inputState == INPUT_STATE_INITIAL && !strcmp(inputScanner->value.v_string, "HELLO")) {
179 		if (!parseNextInt("HELLO:ERR Version argument expected.\n")) {
180 			goto line_processed;
181 		}
182 		if (inputScanner->value.v_int < 1) {
183 			sendLine("HELLO:ERR Unsupported version.\n");
184 			goto line_processed;
185 		}
186 		sendLine("HELLO:OK 1\n");
187 		inputState = INPUT_STATE_COMMAND;
188 		goto line_processed;
189 	}
190 
191 	if (inputState == INPUT_STATE_COMMAND && !strcmp(inputScanner->value.v_string, "SETFILTER")) {
192 		const char * filterValidationError;
193 
194 		if (!parseNextString("SETFILTER:ERR Filter expected.\n")) {
195 			goto line_processed;
196 		}
197 		JCONFIG_BPFFILTERS_SETNONE;
198 
199 		filterValidationError = jutil_ValidateBPFFilter(inputScanner->value.v_string);
200 		if (filterValidationError) {
201 			sendLinef("SETFILTER:ERR Error parsing filter rule: %s.\n", filterValidationError);
202 			goto line_processed;
203 		}
204 
205 		JCONFIG_BPFFILTERS_SETSELECTEDFILTER(JCONFIG_BPFFILTERS_LEN);
206 		jconfig_AddBpfFilter("<fromsetfilter>", g_strdup(inputScanner->value.v_string));
207 		sendLine("SETFILTER:OK Filter set.\n");
208 		goto line_processed;
209 	}
210 
211 	if (inputState == INPUT_STATE_COMMAND && !strcmp(inputScanner->value.v_string, "INTERFACE")) {
212 		if (!parseNextString("INTERFACE:ERR Device name expected.\n")) {
213 			goto line_processed;
214 		}
215 
216 		if (strlen(inputScanner->value.v_string) > MAX_INTERFACENAME_LENGTH-1) {
217 			sendLine("INTERFACE:ERR Interface name too long.\n");
218 			goto line_processed;
219 		}
220 
221 		strcpy(interfaceName, inputScanner->value.v_string);
222 		jconfig_SelectDevice(interfaceName);
223 		sendLine("INTERFACE:OK Interface set.\n");
224 		goto line_processed;
225 	}
226 
227 	if (inputState == INPUT_STATE_COMMAND && !strcmp(inputScanner->value.v_string, "INTERFACES")) {
228 		int i;
229 		char buffer[256];
230 
231 		sendLine("INTERFACES:OK Interface list follows.\n");
232 		for (i=0; i<jdevice_DevicesCount; i++) {
233 			jbase_device *device = jdevice_Devices+i;
234 			jutil_StorageAddress2String(&device->hwaddr, buffer, sizeof(buffer)-1);
235 			sendLinef("INTERFACES:INFO %s\t%d\t%s\n", device->name, ((const struct sockaddr *)&device->hwaddr)->sa_family, buffer);
236 		}
237 		sendLine("INTERFACES:END End of list.\n");
238 		goto line_processed;
239 	}
240 
241 	if (inputState == INPUT_STATE_COMMAND && !strcmp(inputScanner->value.v_string, "RUN")) {
242 		inputState = INPUT_STATE_RUNNING;
243 		goto line_processed;
244 	}
245 
246 	if (inputState == INPUT_STATE_RUNNING && !strcmp(inputScanner->value.v_string, "STOP")) {
247 		inputState = INPUT_STATE_COMMAND;
248 		goto line_processed;
249 	}
250 
251 	if (!strcmp(inputScanner->value.v_string, "EXIT")) {
252 		sendLine("EXIT:OK Good Bye.\n");
253 		stayConnected = FALSE;
254 		goto line_processed;
255 	}
256 
257 	sendLinef("%s:ERR Unknown command.\n", inputScanner->value.v_string);
258 
259 line_processed:
260 	g_free(commandLine);
261 	return stayConnected;
262 }
263 
sendDeleteStream(GString * buffer,jbase_stream * s)264 static void sendDeleteStream(GString *buffer, jbase_stream *s) {
265 	g_string_append_printf(buffer, "SOOB:D %08x%08x\n", (unsigned int)(s->uid>>32), (unsigned int)(s->uid&0xffffffff));
266 }
267 
268 #define SOOBU_MESSAGEFORMAT "SOOB:U $uid$\t$srcbytes$\t$dstbytes$\t$totalbytes$\t$srcpackets$\t$dstpackets$\t$totalpackets$\t$srcbps$\t$dstbps$\t$totalbps$\t$srcpps$\t$dstpps$\t$totalpps$\t$filterdataifchanged$"
269 #define SOOBN_MESSAGEFORMAT "SOOB:N $uid$\t$src$\t$dst$\t$proto$\t$srcport$\t$dstport$"
270 
sendStatistics(GString * buffer)271 static void sendStatistics(GString *buffer) {
272 	g_string_append_printf(buffer, "SOOB:S %u\t%u\t%u\t%u\t%u\t%u\t%u\t%u\t%u\t%u\t%u\t%u\n",
273 		jprocessor_Stats.totalSrcBytes,
274 		jprocessor_Stats.totalDstBytes,
275 		jprocessor_Stats.totalBytes,
276 		jprocessor_Stats.totalSrcPackets,
277 		jprocessor_Stats.totalDstPackets,
278 		jprocessor_Stats.totalPackets,
279 		jprocessor_Stats.totalSrcBPS,
280 		jprocessor_Stats.totalDstBPS,
281 		jprocessor_Stats.totalBPS,
282 		jprocessor_Stats.totalSrcPPS,
283 		jprocessor_Stats.totalDstPPS,
284 		jprocessor_Stats.totalPPS);
285 }
286 
sendUpdateStream(GString * buffer,jbase_stream * s)287 static void sendUpdateStream(GString *buffer, jbase_stream *s) {
288 	jutil_InterpretStreamFormat(buffer, soobuMessageFormat, s);
289 	g_string_append_c(buffer, '\n');
290 	s->filterDataLastDisplayChangeCount = s->filterDataChangeCount;
291 }
292 
sendNewStream(GString * buffer,jbase_stream * s)293 static void sendNewStream(GString *buffer, jbase_stream *s) {
294 	jutil_InterpretStreamFormat(buffer, soobnMessageFormat, s);
295 	g_string_append_c(buffer, '\n');
296 }
297 
resolvedNotifyFunc(jbase_resolv_entry * entry)298 static void resolvedNotifyFunc(jbase_resolv_entry *entry) {
299 	gchar addr[INET6_ADDRSTRLEN + 1];
300 
301 	g_mutex_lock(runningMutex);
302 	if (runningState == RUNNING_STATE_RUNNING) {
303 		jutil_Address2String(entry->af, &entry->addr, addr, INET6_ADDRSTRLEN);
304 		sendLinef("SOOB:L %s\t%s\n", addr, entry->name);
305 	}
306 	g_mutex_unlock(runningMutex);
307 }
308 
processStreamsFunc(GPtrArray * streamArray)309 static void processStreamsFunc(GPtrArray * streamArray) {
310 	guint i;
311 
312 	g_mutex_lock(runningMutex);
313 	if (runningState == RUNNING_STATE_RUNNING) {
314 		GString *buffer = g_string_sized_new(streamArray->len * 100);
315 		sendStatistics(buffer);
316 		for (i=0; i<streamArray->len; i++) {
317 			jbase_stream *s = (jbase_stream *)g_ptr_array_index(streamArray, i);
318 			if (s->dead && !s->displayed) {
319 				continue;
320 			}
321 			if (s->dead) {
322 				sendDeleteStream(buffer, s);
323 				s->displayed = 0;
324 				continue;
325 			}
326 			if (!s->displayed) {
327 				s->displayed = 1;
328 				sendNewStream(buffer, s);
329 			}
330 			sendUpdateStream(buffer, s);
331 		}
332 		sendLinef("%s", buffer->str);
333 		g_string_free(buffer, TRUE);
334 	}
335 	sendLinef("SOOB:E Update finished.\n");
336 	g_mutex_unlock(runningMutex);
337 }
338 
jnetdisplay_PreSetup()339 static gboolean jnetdisplay_PreSetup() {
340 	return TRUE;
341 }
342 
jnetdisplay_Setup()343 static void jnetdisplay_Setup() {
344 	setvbuf(stdin, NULL, _IOLBF, 0);
345 	setvbuf(stdout, NULL, _IOLBF, 0);
346 
347 	inputScanner = g_scanner_new(&scannerConfig);
348 	outputMutex = g_mutex_new();
349 	runningMutex = g_mutex_new();
350 	stringBuffer = g_string_new("");
351 
352 	soobuMessageFormat = g_strdup(SOOBU_MESSAGEFORMAT);
353 	soobnMessageFormat = g_strdup(SOOBN_MESSAGEFORMAT);
354 
355 	jprocessor_Sorting = FALSE;
356 
357 	jprocessor_SetProcessStreamsFunc((ProcessStreamsFunc) processStreamsFunc);
358 	jresolver_SetResolvedNotifyFunc((ResolvedNotifyFunc) resolvedNotifyFunc);
359 }
360 
jnetdisplay_PreRunSetup()361 static gboolean jnetdisplay_PreRunSetup() {
362 	do {
363 		if (!processNextCommand()) {
364 			return FALSE;
365 		}
366 		if (inputState == INPUT_STATE_RUNNING) {
367 			return TRUE;
368 		}
369 	} while (TRUE);
370 }
371 
jnetdisplay_PreRun()372 static void jnetdisplay_PreRun() {
373 }
374 
jnetdisplay_Run()375 static gboolean jnetdisplay_Run() {
376 	sendLine("RUN:OK Running...\n");
377 	g_mutex_lock(runningMutex);
378 	runningState = RUNNING_STATE_RUNNING;
379 	g_mutex_unlock(runningMutex);
380 	while (inputState == INPUT_STATE_RUNNING && processNextCommand()) {
381 	}
382 	g_mutex_lock(runningMutex);
383 	runningState = RUNNING_STATE_DO_NOTHING;
384 	g_mutex_unlock(runningMutex);
385 	if (inputState == INPUT_STATE_COMMAND) {
386 		sendLine("STOP:OK Stopped.\n");
387 	}
388 	return inputState == INPUT_STATE_COMMAND;
389 }
390 
jnetdisplay_Shutdown()391 static void jnetdisplay_Shutdown() {
392 }
393 
jnetdisplay_DrawStatus(const gchar * msg)394 static void jnetdisplay_DrawStatus(const gchar *msg) {
395 	sendLinef("STATUS:OOB %s\n", msg);
396 }
397 
jnetdisplay_ProcessArgument(const gchar ** arg,int argc)398 static int jnetdisplay_ProcessArgument(const gchar **arg, int argc) {
399 	return 0;
400 }
401 
402 jbase_display	jnetdisplay_Functions = {
403 	TRUE,
404 	jnetdisplay_PreSetup,
405 	jnetdisplay_Setup,
406 	jnetdisplay_PreRunSetup,
407 	jnetdisplay_PreRun,
408 	jnetdisplay_Run,
409 	jnetdisplay_Shutdown,
410 	jnetdisplay_DrawStatus,
411 	jnetdisplay_ProcessArgument
412 };
413 
414 #else
415 
416 jbase_display	jnetdisplay_Functions = { FALSE };
417 
418 #endif
419