1 /*
2 ** Copyright (C) 2005-2020 by Carnegie Mellon University.
3 **
4 ** @OPENSOURCE_LICENSE_START@
5 ** See license information in ../../LICENSE.txt
6 ** @OPENSOURCE_LICENSE_END@
7 */
8
9 /*
10 ** rwaugmentedio.c
11 **
12 ** Suresh L Konda
13 ** routines to do io stuff with augmented records.
14 */
15
16 #include <silk/silk.h>
17
18 RCSIDENT("$SiLK: rwaugmentedio.c ef14e54179be 2020-04-14 21:57:45Z mthomas $");
19
20 /* #define RWPACK_BYTES_PACKETS 1 */
21 #define RWPACK_FLAGS_TIMES_VOLUMES 1
22 #define RWPACK_PROTO_FLAGS 1
23 /* #define RWPACK_SBB_PEF 1 */
24 #define RWPACK_TIME_BYTES_PKTS_FLAGS 1
25 #define RWPACK_TIMES_FLAGS_PROTO 1
26 #include "skstream_priv.h"
27 #include "rwpack.c"
28
29
30 /* Version to use when SK_RECORD_VERSION_ANY is specified */
31 #define DEFAULT_RECORD_VERSION 4
32
33
34 /* ********************************************************************* */
35
36 /*
37 ** RWAUGMENTED VERSION 5
38 **
39 ** in the following: EXPANDED == ((tcp_state & SK_TCPSTATE_EXPANDED) ? 1 : 0)
40 **
41 ** uint32_t rflag_stime; // 0- 3
42 ** // uint32_t rest_flags: 8; // is_tcp==0: Empty; else
43 ** // EXPANDED==0:Empty
44 ** // EXPANDED==1:TCPflags/!1st pkt
45 ** // uint32_t is_tcp : 1; // 1 if FLOW is TCP; 0 otherwise
46 ** // uint32_t unused : 1; // Reserved
47 ** // uint32_t stime :22; // Start time:msec offset from hour
48 **
49 ** uint8_t proto_iflags; // 4 is_tcp==0: Protocol; else:
50 ** // EXPANDED==0:TCPflags/ALL pkts
51 ** // EXPANDED==1:TCPflags/1st pkt
52 ** uint8_t tcp_state; // 5 TCP state machine info
53 ** uint16_t application; // 6- 7 Indication of type of traffic
54 **
55 ** uint16_t sPort; // 8- 9 Source port
56 ** uint16_t dPort; // 10-11 Destination port
57 **
58 ** uint32_t elapsed; // 12-15 Duration of the flow
59 **
60 ** uint32_t pkts; // 16-19 Count of packets
61 ** uint32_t bytes; // 20-23 Count of bytes
62 **
63 ** uint32_t sIP; // 24-27 Source IP
64 ** uint32_t dIP; // 28-31 Destination IP
65 **
66 **
67 ** 32 bytes on disk.
68 */
69
70 #define RECLEN_RWAUGMENTED_V5 32
71
72
73 /*
74 * Byte swap the RWAUGMENTED v5 record 'ar' in place.
75 */
76 #define augmentedioRecordSwap_V5(ar) \
77 { \
78 SWAP_DATA32((ar) + 0); /* rflag_stime */ \
79 /* two single bytes (4)proto_iflags, (5)tcp_state */ \
80 SWAP_DATA16((ar) + 6); /* application */ \
81 SWAP_DATA16((ar) + 8); /* sPort */ \
82 SWAP_DATA16((ar) + 10); /* dPort */ \
83 SWAP_DATA32((ar) + 12); /* elapsed */ \
84 SWAP_DATA32((ar) + 16); /* pkts */ \
85 SWAP_DATA32((ar) + 20); /* bytes */ \
86 SWAP_DATA32((ar) + 24); /* sIP */ \
87 SWAP_DATA32((ar) + 28); /* dIP */ \
88 }
89
90
91 /*
92 * Unpack the array of bytes 'ar' into a record 'rwrec'
93 */
94 static int
augmentedioRecordUnpack_V5(skstream_t * stream,rwGenericRec_V5 * rwrec,uint8_t * ar)95 augmentedioRecordUnpack_V5(
96 skstream_t *stream,
97 rwGenericRec_V5 *rwrec,
98 uint8_t *ar)
99 {
100 /* swap if required */
101 if (stream->swapFlag) {
102 augmentedioRecordSwap_V5(ar);
103 }
104
105 /* Start time, TCP flags, Protocol, TCP State */
106 rwpackUnpackTimesFlagsProto(rwrec, ar, stream->hdr_starttime);
107
108 /* application */
109 rwRecMemSetApplication(rwrec, &ar[ 6]);
110
111 /* sPort, dPort */
112 rwRecMemSetSPort(rwrec, &ar[ 8]);
113 rwRecMemSetDPort(rwrec, &ar[10]);
114
115 /* Elapsed */
116 rwRecMemSetElapsed(rwrec, &ar[12]);
117
118 /* packets, bytes */
119 rwRecMemSetPkts(rwrec, &ar[16]);
120 rwRecMemSetBytes(rwrec, &ar[20]);
121
122 /* sIP, dIP */
123 rwRecMemSetSIPv4(rwrec, &ar[24]);
124 rwRecMemSetDIPv4(rwrec, &ar[28]);
125
126 /* sensor, flow_type from file name/header */
127 rwRecSetSensor(rwrec, stream->hdr_sensor);
128 rwRecSetFlowType(rwrec, stream->hdr_flowtype);
129
130 return SKSTREAM_OK;
131 }
132
133
134 /*
135 * Pack the record 'rwrec' into an array of bytes 'ar'
136 */
137 static int
augmentedioRecordPack_V5(skstream_t * stream,const rwGenericRec_V5 * rwrec,uint8_t * ar)138 augmentedioRecordPack_V5(
139 skstream_t *stream,
140 const rwGenericRec_V5 *rwrec,
141 uint8_t *ar)
142 {
143 int rv;
144
145 /* Start time, TCP Flags, Protocol, TCP State */
146 rv = rwpackPackTimesFlagsProto(rwrec, ar, stream->hdr_starttime);
147 if (rv) {
148 return rv;
149 }
150
151 /* application */
152 rwRecMemGetApplication(rwrec, &ar[6]);
153
154 /* sPort, dPort */
155 rwRecMemGetSPort(rwrec, &ar[ 8]);
156 rwRecMemGetDPort(rwrec, &ar[10]);
157
158 /* Elapsed */
159 rwRecMemGetElapsed(rwrec, &ar[12]);
160
161 /* packets, bytes */
162 rwRecMemGetPkts(rwrec, &ar[16]);
163 rwRecMemGetBytes(rwrec, &ar[20]);
164
165 /* sIP, dIP */
166 rwRecMemGetSIPv4(rwrec, &ar[24]);
167 rwRecMemGetDIPv4(rwrec, &ar[28]);
168
169 /* swap if required */
170 if (stream->swapFlag) {
171 augmentedioRecordSwap_V5(ar);
172 }
173
174 return SKSTREAM_OK;
175 }
176
177
178 /* ********************************************************************* */
179
180 /*
181 ** RWAUGMENTED VERSION 4
182 **
183 ** in the following: EXPANDED == ((tcp_state & SK_TCPSTATE_EXPANDED) ? 1 : 0)
184 **
185 ** uint32_t stime_bb1; // 0- 3
186 ** // uint32_t stime :22 // Start time:msec offset from hour
187 ** // uint32_t bPPkt1 :10; // Whole bytes-per-packet (hi 10)
188 **
189 ** uint32_t bb2_elapsed; // 4- 7
190 ** // uint32_t bPPkt2 : 4; // Whole bytes-per-packet (low 4)
191 ** // uint32_t bPPFrac : 6; // Fractional bytes-per-packet
192 ** // uint32_t elapsed :22; // Duration of flow in msec
193 **
194 ** uint32_t pro_flg_pkts; // 8-11
195 ** // uint32_t prot_flags: 8; // is_tcp==0: IP protocol
196 ** // is_tcp==1 &&
197 ** // EXPANDED==0:TCPflags/All pkts
198 ** // EXPANDED==1:TCPflags/1st pkt
199 ** // uint32_t pflag : 1; // 'pkts' requires multiplier?
200 ** // uint32_t is_tcp : 1; // 1 if flow is TCP; 0 otherwise
201 ** // uint32_t padding : 2; //
202 ** // uint32_t pkts :20; // Count of packets
203 **
204 ** uint8_t tcp_state; // 12 TCP state machine info
205 ** uint8_t rest_flags; // 13 is_tcp==0: Flow's reported flags
206 ** // is_tcp==1 &&
207 ** // EXPANDED==0:Empty
208 ** // EXPANDED==1:TCPflags/!1st pkt
209 ** uint16_t application; // 14-15 Type of traffic
210 **
211 ** uint16_t sPort; // 16-17 Source port
212 ** uint16_t dPort; // 18-19 Destination port
213 **
214 ** uint32_t sIP; // 20-23 Source IP
215 ** uint32_t dIP; // 24-27 Destination IP
216 **
217 **
218 ** 28 bytes on disk.
219 */
220
221 #define RECLEN_RWAUGMENTED_V4 28
222
223
224 /*
225 * Byte swap the RWAUGMENTED v4 record 'ar' in place.
226 */
227 #define augmentedioRecordSwap_V4(ar) \
228 { \
229 SWAP_DATA32((ar) + 0); /* stime_bb1 */ \
230 SWAP_DATA32((ar) + 4); /* bb2_elapsed */ \
231 SWAP_DATA32((ar) + 8); /* pro_flg_pkts */ \
232 /* two single bytes (12)tcp_state, (13)rest_flags */ \
233 SWAP_DATA16((ar) + 14); /* application */ \
234 SWAP_DATA16((ar) + 16); /* sPort */ \
235 SWAP_DATA16((ar) + 18); /* dPort */ \
236 SWAP_DATA32((ar) + 20); /* sIP */ \
237 SWAP_DATA32((ar) + 24); /* dIP */ \
238 }
239
240
241 /*
242 * Unpack the array of bytes 'ar' into a record 'rwrec'
243 */
244 static int
augmentedioRecordUnpack_V4(skstream_t * stream,rwGenericRec_V5 * rwrec,uint8_t * ar)245 augmentedioRecordUnpack_V4(
246 skstream_t *stream,
247 rwGenericRec_V5 *rwrec,
248 uint8_t *ar)
249 {
250 /* swap if required */
251 if (stream->swapFlag) {
252 augmentedioRecordSwap_V4(ar);
253 }
254
255 /* sTime, elapsed, pkts, bytes, proto, tcp-flags, state, application */
256 rwpackUnpackFlagsTimesVolumes(rwrec, ar, stream->hdr_starttime, 16, 0);
257
258 /* sPort, dPort */
259 rwRecMemSetSPort(rwrec, &ar[16]);
260 rwRecMemSetDPort(rwrec, &ar[18]);
261
262 /* sIP, dIP */
263 rwRecMemSetSIPv4(rwrec, &ar[20]);
264 rwRecMemSetDIPv4(rwrec, &ar[24]);
265
266 /* sensor, flow_type from file name/header */
267 rwRecSetSensor(rwrec, stream->hdr_sensor);
268 rwRecSetFlowType(rwrec, stream->hdr_flowtype);
269
270 return SKSTREAM_OK;
271 }
272
273
274 /*
275 * Pack the record 'rwrec' into an array of bytes 'ar'
276 */
277 static int
augmentedioRecordPack_V4(skstream_t * stream,const rwGenericRec_V5 * rwrec,uint8_t * ar)278 augmentedioRecordPack_V4(
279 skstream_t *stream,
280 const rwGenericRec_V5 *rwrec,
281 uint8_t *ar)
282 {
283 int rv = SKSTREAM_OK; /* return value */
284
285 /* sTime, elapsed, pkts, bytes, proto, tcp-flags, state, application */
286 rv = rwpackPackFlagsTimesVolumes(ar, rwrec, stream->hdr_starttime, 16);
287 if (rv) {
288 return rv;
289 }
290
291 /* sPort, dPort */
292 rwRecMemGetSPort(rwrec, &ar[16]);
293 rwRecMemGetDPort(rwrec, &ar[18]);
294
295 /* sIP, dIP */
296 rwRecMemGetSIPv4(rwrec, &ar[20]);
297 rwRecMemGetDIPv4(rwrec, &ar[24]);
298
299 /* swap if required */
300 if (stream->swapFlag) {
301 augmentedioRecordSwap_V4(ar);
302 }
303
304 return SKSTREAM_OK;
305 }
306
307
308 /* ********************************************************************* */
309
310 /*
311 ** RWAUGMENTED VERSION 1
312 ** RWAUGMENTED VERSION 2
313 ** RWAUGMENTED VERSION 3
314 **
315 ** uint32_t sIP; // 0- 3 Source IP
316 ** uint32_t dIP; // 4- 7 Destination IP
317 **
318 ** uint16_t sPort; // 8- 9 Source port
319 ** uint16_t dPort; // 10-11 Destination port
320 **
321 ** uint32_t pkts_stime; // 12-15
322 ** // uint32_t pkts :20; // Count of packets
323 ** // uint32_t sTime :12; // Start time--offset from hour
324 **
325 ** uint32_t bbe; // 16-19
326 ** // uint32_t bPPkt :14; // Whole bytes-per-packet
327 ** // uint32_t bPPFrac : 6; // Fractional bytes-per-packet
328 ** // uint32_t elapsed :12; // Duration of flow
329 **
330 ** uint32_t msec_flags // 20-23
331 ** // uint32_t sTime_msec:10; // Fractional sTime (millisec)
332 ** // uint32_t elaps_msec:10; // Fractional elapsed (millisec)
333 ** // uint32_t pflag : 1; // 'pkts' requires multiplier?
334 ** // uint32_t is_tcp : 1; // 1 if flow is TCP; 0 otherwise
335 ** // uint32_t padding : 2; // padding/reserved
336 ** // uint32_t prot_flags: 8; // is_tcp==0: IP protocol
337 ** // is_tcp==1 &&
338 ** // EXPANDED==0:TCPflags/All pkts
339 ** // EXPANDED==1:TCPflags/1st pkt
340 **
341 ** uint16_t application; // 24-25 Type of traffic
342 **
343 ** uint8_t tcp_state; // 26 TCP state machine info
344 ** uint8_t rest_flags; // 27 is_tcp==0: Flow's reported flags
345 ** // is_tcp==1 &&
346 ** // EXPANDED==0:Empty
347 ** // EXPANDED==1:TCPflags/!1st pkt
348 **
349 **
350 ** 28 bytes on disk.
351 */
352
353 #define RECLEN_RWAUGMENTED_V1 28
354 #define RECLEN_RWAUGMENTED_V2 28
355 #define RECLEN_RWAUGMENTED_V3 28
356
357
358 /*
359 * Byte swap the RWAUGMENTED v1 record 'ar' in place.
360 */
361 #define augmentedioRecordSwap_V1(ar) \
362 { \
363 SWAP_DATA32((ar) + 0); /* sIP */ \
364 SWAP_DATA32((ar) + 4); /* dIP */ \
365 SWAP_DATA16((ar) + 8); /* sPort */ \
366 SWAP_DATA16((ar) + 10); /* dPort */ \
367 SWAP_DATA32((ar) + 12); /* pkts_stime */ \
368 SWAP_DATA32((ar) + 16); /* bbe */ \
369 SWAP_DATA32((ar) + 20); /* msec_flags */ \
370 SWAP_DATA16((ar) + 24); /* application */ \
371 /* Two single bytes: (26)tcp_state, (27)rest_flags */ \
372 }
373
374
375 /*
376 * Unpack the array of bytes 'ar' into a record 'rwrec'
377 */
378 static int
augmentedioRecordUnpack_V1(skstream_t * stream,rwGenericRec_V5 * rwrec,uint8_t * ar)379 augmentedioRecordUnpack_V1(
380 skstream_t *stream,
381 rwGenericRec_V5 *rwrec,
382 uint8_t *ar)
383 {
384 uint32_t msec_flags;
385 uint8_t is_tcp, prot_flags;
386
387 /* swap if required */
388 if (stream->swapFlag) {
389 augmentedioRecordSwap_V1(ar);
390 }
391
392 /* sIP, dIP, sPort, dPort */
393 rwRecMemSetSIPv4(rwrec, &ar[0]);
394 rwRecMemSetDIPv4(rwrec, &ar[4]);
395 rwRecMemSetSPort(rwrec, &ar[8]);
396 rwRecMemSetDPort(rwrec, &ar[10]);
397
398 /* msec times, proto or flags */
399 memcpy(&msec_flags, &ar[20], 4);
400
401 /* application */
402 rwRecMemSetApplication(rwrec, &ar[24]);
403
404 /* sTime, pkts, bytes, elapsed, proto, tcp-flags, bpp */
405 rwpackUnpackTimeBytesPktsFlags(rwrec, stream->hdr_starttime,
406 (uint32_t*)&ar[12], (uint32_t*)&ar[16],
407 &msec_flags);
408
409 /* extra TCP information */
410 is_tcp = (uint8_t)GET_MASKED_BITS(msec_flags, 10, 1);
411 prot_flags = (uint8_t)GET_MASKED_BITS(msec_flags, 0, 8);
412 rwpackUnpackProtoFlags(rwrec, is_tcp, prot_flags, ar[26], ar[27]);
413
414 /* sensor, flow_type from file name/header */
415 rwRecSetSensor(rwrec, stream->hdr_sensor);
416 rwRecSetFlowType(rwrec, stream->hdr_flowtype);
417
418 return SKSTREAM_OK;
419 }
420
421
422 /*
423 * Pack the record 'rwrec' into an array of bytes 'ar'
424 */
425 static int
augmentedioRecordPack_V1(skstream_t * stream,const rwGenericRec_V5 * rwrec,uint8_t * ar)426 augmentedioRecordPack_V1(
427 skstream_t *stream,
428 const rwGenericRec_V5 *rwrec,
429 uint8_t *ar)
430 {
431 int rv = SKSTREAM_OK; /* return value */
432 uint32_t msec_flags;
433 uint8_t is_tcp, prot_flags;
434
435 /* sTime, pkts, bytes, elapsed, proto, tcp-flags, bpp */
436 rv = rwpackPackTimeBytesPktsFlags((uint32_t*)&ar[12], (uint32_t*)&ar[16],
437 &msec_flags,
438 rwrec, stream->hdr_starttime);
439 if (rv) {
440 return rv;
441 }
442
443 rwpackPackProtoFlags(&is_tcp, &prot_flags, &ar[26], &ar[27], rwrec);
444
445 /* msec_flags: sTime_msec:10; elaps_msec:10; pflag:1;
446 * is_tcp:1; pad:2; prot_flags:8; */
447 /* overwrite the least significant 11 bits */
448 msec_flags = ((msec_flags & (MASKARRAY_21 << 11))
449 | (is_tcp ? (1 << 10) : 0)
450 | prot_flags);
451
452 /* sIP, dIP, sPort, dPort */
453 rwRecMemGetSIPv4(rwrec, &ar[0]);
454 rwRecMemGetDIPv4(rwrec, &ar[4]);
455 rwRecMemGetSPort(rwrec, &ar[8]);
456 rwRecMemGetDPort(rwrec, &ar[10]);
457
458 /* msec_flags */
459 memcpy(&ar[20], &msec_flags, 4);
460
461 /* application */
462 rwRecMemGetApplication(rwrec, &ar[24]);
463
464 /* swap if required */
465 if (stream->swapFlag) {
466 augmentedioRecordSwap_V1(ar);
467 }
468
469 return SKSTREAM_OK;
470 }
471
472
473 /* ********************************************************************* */
474
475 /*
476 * Return length of record of specified version, or 0 if no such
477 * version exists. See skstream_priv.h for details.
478 */
479 uint16_t
augmentedioGetRecLen(sk_file_version_t vers)480 augmentedioGetRecLen(
481 sk_file_version_t vers)
482 {
483 switch (vers) {
484 case 1:
485 return RECLEN_RWAUGMENTED_V1;
486 case 2:
487 return RECLEN_RWAUGMENTED_V2;
488 case 3:
489 return RECLEN_RWAUGMENTED_V3;
490 case 4:
491 return RECLEN_RWAUGMENTED_V4;
492 case 5:
493 return RECLEN_RWAUGMENTED_V5;
494 default:
495 return 0;
496 }
497 }
498
499
500 /*
501 * status = augmentedioPrepare(&stream);
502 *
503 * Sets the record version to the default if it is unspecified,
504 * checks that the record format supports the requested record
505 * version, sets the record length, and sets the pack and unpack
506 * functions for this record format and version.
507 */
508 int
augmentedioPrepare(skstream_t * stream)509 augmentedioPrepare(
510 skstream_t *stream)
511 {
512 #define FILE_FORMAT "FT_RWAUGMENTED"
513 sk_file_header_t *hdr = stream->silk_hdr;
514 int rv = SKSTREAM_OK; /* return value */
515
516 assert(skHeaderGetFileFormat(hdr) == FT_RWAUGMENTED);
517
518 /* Set version if none was selected by caller */
519 if ((stream->io_mode == SK_IO_WRITE)
520 && (skHeaderGetRecordVersion(hdr) == SK_RECORD_VERSION_ANY))
521 {
522 skHeaderSetRecordVersion(hdr, DEFAULT_RECORD_VERSION);
523 }
524
525 /* version check; set values based on version */
526 switch (skHeaderGetRecordVersion(hdr)) {
527 case 5:
528 stream->rwUnpackFn = &augmentedioRecordUnpack_V5;
529 stream->rwPackFn = &augmentedioRecordPack_V5;
530 break;
531 case 4:
532 stream->rwUnpackFn = &augmentedioRecordUnpack_V4;
533 stream->rwPackFn = &augmentedioRecordPack_V4;
534 break;
535 case 3:
536 case 2:
537 case 1:
538 /* V1 and V2 differ only in the padding of the header */
539 /* V2 and V3 differ only in that V3 supports compression on
540 * read and write; V2 supports compression only on read */
541 stream->rwUnpackFn = &augmentedioRecordUnpack_V1;
542 stream->rwPackFn = &augmentedioRecordPack_V1;
543 break;
544 case 0:
545 default:
546 rv = SKSTREAM_ERR_UNSUPPORT_VERSION;
547 goto END;
548 }
549
550 stream->recLen = augmentedioGetRecLen(skHeaderGetRecordVersion(hdr));
551
552 /* verify lengths */
553 if (stream->recLen == 0) {
554 skAppPrintErr("Record length not set for %s version %u",
555 FILE_FORMAT, (unsigned)skHeaderGetRecordVersion(hdr));
556 skAbort();
557 }
558 if (stream->recLen != skHeaderGetRecordLength(hdr)) {
559 if (0 == skHeaderGetRecordLength(hdr)) {
560 skHeaderSetRecordLength(hdr, stream->recLen);
561 } else {
562 skAppPrintErr(("Record length mismatch for %s version %u\n"
563 "\tcode = %" PRIu16 " bytes; header = %lu bytes"),
564 FILE_FORMAT, (unsigned)skHeaderGetRecordVersion(hdr),
565 stream->recLen,
566 (unsigned long)skHeaderGetRecordLength(hdr));
567 skAbort();
568 }
569 }
570
571 END:
572 return rv;
573 }
574
575
576 /*
577 ** Local Variables:
578 ** mode:c
579 ** indent-tabs-mode:nil
580 ** c-basic-offset:4
581 ** End:
582 */
583