1 /*
2 ** Copyright (C) 2005-2020 by Carnegie Mellon University.
3 **
4 ** @OPENSOURCE_LICENSE_START@
5 ** See license information in ../../LICENSE.txt
6 ** @OPENSOURCE_LICENSE_END@
7 */
8 
9 /*
10 ** rwaugmentedio.c
11 **
12 ** Suresh L Konda
13 **      routines to do io stuff with augmented records.
14 */
15 
16 #include <silk/silk.h>
17 
18 RCSIDENT("$SiLK: rwaugmentedio.c ef14e54179be 2020-04-14 21:57:45Z mthomas $");
19 
20 /* #define RWPACK_BYTES_PACKETS          1 */
21 #define RWPACK_FLAGS_TIMES_VOLUMES    1
22 #define RWPACK_PROTO_FLAGS            1
23 /* #define RWPACK_SBB_PEF                1 */
24 #define RWPACK_TIME_BYTES_PKTS_FLAGS  1
25 #define RWPACK_TIMES_FLAGS_PROTO      1
26 #include "skstream_priv.h"
27 #include "rwpack.c"
28 
29 
30 /* Version to use when SK_RECORD_VERSION_ANY is specified */
31 #define DEFAULT_RECORD_VERSION 4
32 
33 
34 /* ********************************************************************* */
35 
36 /*
37 **  RWAUGMENTED VERSION 5
38 **
39 **  in the following: EXPANDED == ((tcp_state & SK_TCPSTATE_EXPANDED) ? 1 : 0)
40 **
41 **    uint32_t      rflag_stime;     //  0- 3
42 **    // uint32_t     rest_flags: 8; //        is_tcp==0: Empty; else
43 **                                   //          EXPANDED==0:Empty
44 **                                   //          EXPANDED==1:TCPflags/!1st pkt
45 **    // uint32_t     is_tcp    : 1; //        1 if FLOW is TCP; 0 otherwise
46 **    // uint32_t     unused    : 1; //        Reserved
47 **    // uint32_t     stime     :22; //        Start time:msec offset from hour
48 **
49 **    uint8_t       proto_iflags;    //  4     is_tcp==0: Protocol; else:
50 **                                   //          EXPANDED==0:TCPflags/ALL pkts
51 **                                   //          EXPANDED==1:TCPflags/1st pkt
52 **    uint8_t       tcp_state;       //  5     TCP state machine info
53 **    uint16_t      application;     //  6- 7  Indication of type of traffic
54 **
55 **    uint16_t      sPort;           //  8- 9  Source port
56 **    uint16_t      dPort;           // 10-11  Destination port
57 **
58 **    uint32_t      elapsed;         // 12-15  Duration of the flow
59 **
60 **    uint32_t      pkts;            // 16-19  Count of packets
61 **    uint32_t      bytes;           // 20-23  Count of bytes
62 **
63 **    uint32_t      sIP;             // 24-27  Source IP
64 **    uint32_t      dIP;             // 28-31  Destination IP
65 **
66 **
67 **  32 bytes on disk.
68 */
69 
70 #define RECLEN_RWAUGMENTED_V5 32
71 
72 
73 /*
74  *    Byte swap the RWAUGMENTED v5 record 'ar' in place.
75  */
76 #define augmentedioRecordSwap_V5(ar)                            \
77     {                                                           \
78         SWAP_DATA32((ar) +  0);   /* rflag_stime */             \
79         /* two single bytes (4)proto_iflags, (5)tcp_state */    \
80         SWAP_DATA16((ar) +  6);   /* application */             \
81         SWAP_DATA16((ar) +  8);   /* sPort */                   \
82         SWAP_DATA16((ar) + 10);   /* dPort */                   \
83         SWAP_DATA32((ar) + 12);   /* elapsed */                 \
84         SWAP_DATA32((ar) + 16);   /* pkts */                    \
85         SWAP_DATA32((ar) + 20);   /* bytes */                   \
86         SWAP_DATA32((ar) + 24);   /* sIP */                     \
87         SWAP_DATA32((ar) + 28);   /* dIP */                     \
88     }
89 
90 
91 /*
92  *  Unpack the array of bytes 'ar' into a record 'rwrec'
93  */
94 static int
augmentedioRecordUnpack_V5(skstream_t * stream,rwGenericRec_V5 * rwrec,uint8_t * ar)95 augmentedioRecordUnpack_V5(
96     skstream_t         *stream,
97     rwGenericRec_V5    *rwrec,
98     uint8_t            *ar)
99 {
100     /* swap if required */
101     if (stream->swapFlag) {
102         augmentedioRecordSwap_V5(ar);
103     }
104 
105     /* Start time, TCP flags, Protocol, TCP State */
106     rwpackUnpackTimesFlagsProto(rwrec, ar, stream->hdr_starttime);
107 
108     /* application */
109     rwRecMemSetApplication(rwrec, &ar[ 6]);
110 
111     /* sPort, dPort */
112     rwRecMemSetSPort(rwrec, &ar[ 8]);
113     rwRecMemSetDPort(rwrec, &ar[10]);
114 
115     /* Elapsed */
116     rwRecMemSetElapsed(rwrec, &ar[12]);
117 
118     /* packets, bytes */
119     rwRecMemSetPkts(rwrec,  &ar[16]);
120     rwRecMemSetBytes(rwrec, &ar[20]);
121 
122     /* sIP, dIP */
123     rwRecMemSetSIPv4(rwrec, &ar[24]);
124     rwRecMemSetDIPv4(rwrec, &ar[28]);
125 
126     /* sensor, flow_type from file name/header */
127     rwRecSetSensor(rwrec, stream->hdr_sensor);
128     rwRecSetFlowType(rwrec, stream->hdr_flowtype);
129 
130     return SKSTREAM_OK;
131 }
132 
133 
134 /*
135  *  Pack the record 'rwrec' into an array of bytes 'ar'
136  */
137 static int
augmentedioRecordPack_V5(skstream_t * stream,const rwGenericRec_V5 * rwrec,uint8_t * ar)138 augmentedioRecordPack_V5(
139     skstream_t             *stream,
140     const rwGenericRec_V5  *rwrec,
141     uint8_t                *ar)
142 {
143     int rv;
144 
145     /* Start time, TCP Flags, Protocol, TCP State */
146     rv = rwpackPackTimesFlagsProto(rwrec, ar, stream->hdr_starttime);
147     if (rv) {
148         return rv;
149     }
150 
151     /* application */
152     rwRecMemGetApplication(rwrec, &ar[6]);
153 
154     /* sPort, dPort */
155     rwRecMemGetSPort(rwrec, &ar[ 8]);
156     rwRecMemGetDPort(rwrec, &ar[10]);
157 
158     /* Elapsed */
159     rwRecMemGetElapsed(rwrec, &ar[12]);
160 
161     /* packets, bytes */
162     rwRecMemGetPkts(rwrec,  &ar[16]);
163     rwRecMemGetBytes(rwrec, &ar[20]);
164 
165     /* sIP, dIP */
166     rwRecMemGetSIPv4(rwrec, &ar[24]);
167     rwRecMemGetDIPv4(rwrec, &ar[28]);
168 
169     /* swap if required */
170     if (stream->swapFlag) {
171         augmentedioRecordSwap_V5(ar);
172     }
173 
174     return SKSTREAM_OK;
175 }
176 
177 
178 /* ********************************************************************* */
179 
180 /*
181 **  RWAUGMENTED VERSION 4
182 **
183 **  in the following: EXPANDED == ((tcp_state & SK_TCPSTATE_EXPANDED) ? 1 : 0)
184 **
185 **    uint32_t      stime_bb1;       //  0- 3
186 **    // uint32_t     stime     :22  //        Start time:msec offset from hour
187 **    // uint32_t     bPPkt1    :10; //        Whole bytes-per-packet (hi 10)
188 **
189 **    uint32_t      bb2_elapsed;     //  4- 7
190 **    // uint32_t     bPPkt2    : 4; //        Whole bytes-per-packet (low 4)
191 **    // uint32_t     bPPFrac   : 6; //        Fractional bytes-per-packet
192 **    // uint32_t     elapsed   :22; //        Duration of flow in msec
193 **
194 **    uint32_t      pro_flg_pkts;    //  8-11
195 **    // uint32_t     prot_flags: 8; //        is_tcp==0: IP protocol
196 **                                   //        is_tcp==1 &&
197 **                                   //          EXPANDED==0:TCPflags/All pkts
198 **                                   //          EXPANDED==1:TCPflags/1st pkt
199 **    // uint32_t     pflag     : 1; //        'pkts' requires multiplier?
200 **    // uint32_t     is_tcp    : 1; //        1 if flow is TCP; 0 otherwise
201 **    // uint32_t     padding   : 2; //
202 **    // uint32_t     pkts      :20; //        Count of packets
203 **
204 **    uint8_t       tcp_state;       // 12     TCP state machine info
205 **    uint8_t       rest_flags;      // 13     is_tcp==0: Flow's reported flags
206 **                                   //        is_tcp==1 &&
207 **                                   //          EXPANDED==0:Empty
208 **                                   //          EXPANDED==1:TCPflags/!1st pkt
209 **    uint16_t      application;     // 14-15  Type of traffic
210 **
211 **    uint16_t      sPort;           // 16-17  Source port
212 **    uint16_t      dPort;           // 18-19  Destination port
213 **
214 **    uint32_t      sIP;             // 20-23  Source IP
215 **    uint32_t      dIP;             // 24-27  Destination IP
216 **
217 **
218 **  28 bytes on disk.
219 */
220 
221 #define RECLEN_RWAUGMENTED_V4 28
222 
223 
224 /*
225  *    Byte swap the RWAUGMENTED v4 record 'ar' in place.
226  */
227 #define augmentedioRecordSwap_V4(ar)                            \
228     {                                                           \
229         SWAP_DATA32((ar) +  0);   /* stime_bb1 */               \
230         SWAP_DATA32((ar) +  4);   /* bb2_elapsed */             \
231         SWAP_DATA32((ar) +  8);   /* pro_flg_pkts */            \
232         /* two single bytes (12)tcp_state, (13)rest_flags */    \
233         SWAP_DATA16((ar) + 14);   /* application */             \
234         SWAP_DATA16((ar) + 16);   /* sPort */                   \
235         SWAP_DATA16((ar) + 18);   /* dPort */                   \
236         SWAP_DATA32((ar) + 20);   /* sIP */                     \
237         SWAP_DATA32((ar) + 24);   /* dIP */                     \
238     }
239 
240 
241 /*
242  *  Unpack the array of bytes 'ar' into a record 'rwrec'
243  */
244 static int
augmentedioRecordUnpack_V4(skstream_t * stream,rwGenericRec_V5 * rwrec,uint8_t * ar)245 augmentedioRecordUnpack_V4(
246     skstream_t         *stream,
247     rwGenericRec_V5    *rwrec,
248     uint8_t            *ar)
249 {
250     /* swap if required */
251     if (stream->swapFlag) {
252         augmentedioRecordSwap_V4(ar);
253     }
254 
255     /* sTime, elapsed, pkts, bytes, proto, tcp-flags, state, application */
256     rwpackUnpackFlagsTimesVolumes(rwrec, ar, stream->hdr_starttime, 16, 0);
257 
258     /* sPort, dPort */
259     rwRecMemSetSPort(rwrec, &ar[16]);
260     rwRecMemSetDPort(rwrec, &ar[18]);
261 
262     /* sIP, dIP */
263     rwRecMemSetSIPv4(rwrec, &ar[20]);
264     rwRecMemSetDIPv4(rwrec, &ar[24]);
265 
266     /* sensor, flow_type from file name/header */
267     rwRecSetSensor(rwrec, stream->hdr_sensor);
268     rwRecSetFlowType(rwrec, stream->hdr_flowtype);
269 
270     return SKSTREAM_OK;
271 }
272 
273 
274 /*
275  *  Pack the record 'rwrec' into an array of bytes 'ar'
276  */
277 static int
augmentedioRecordPack_V4(skstream_t * stream,const rwGenericRec_V5 * rwrec,uint8_t * ar)278 augmentedioRecordPack_V4(
279     skstream_t             *stream,
280     const rwGenericRec_V5  *rwrec,
281     uint8_t                *ar)
282 {
283     int rv = SKSTREAM_OK; /* return value */
284 
285     /* sTime, elapsed, pkts, bytes, proto, tcp-flags, state, application */
286     rv = rwpackPackFlagsTimesVolumes(ar, rwrec, stream->hdr_starttime, 16);
287     if (rv) {
288         return rv;
289     }
290 
291     /* sPort, dPort */
292     rwRecMemGetSPort(rwrec, &ar[16]);
293     rwRecMemGetDPort(rwrec, &ar[18]);
294 
295     /* sIP, dIP */
296     rwRecMemGetSIPv4(rwrec, &ar[20]);
297     rwRecMemGetDIPv4(rwrec, &ar[24]);
298 
299     /* swap if required */
300     if (stream->swapFlag) {
301         augmentedioRecordSwap_V4(ar);
302     }
303 
304     return SKSTREAM_OK;
305 }
306 
307 
308 /* ********************************************************************* */
309 
310 /*
311 **  RWAUGMENTED VERSION 1
312 **  RWAUGMENTED VERSION 2
313 **  RWAUGMENTED VERSION 3
314 **
315 **    uint32_t      sIP;             //  0- 3  Source IP
316 **    uint32_t      dIP;             //  4- 7  Destination IP
317 **
318 **    uint16_t      sPort;           //  8- 9  Source port
319 **    uint16_t      dPort;           // 10-11  Destination port
320 **
321 **    uint32_t      pkts_stime;      // 12-15
322 **    // uint32_t     pkts      :20; //        Count of packets
323 **    // uint32_t     sTime     :12; //        Start time--offset from hour
324 **
325 **    uint32_t      bbe;             // 16-19
326 **    // uint32_t     bPPkt     :14; //        Whole bytes-per-packet
327 **    // uint32_t     bPPFrac   : 6; //        Fractional bytes-per-packet
328 **    // uint32_t     elapsed   :12; //        Duration of flow
329 **
330 **    uint32_t      msec_flags       // 20-23
331 **    // uint32_t     sTime_msec:10; //        Fractional sTime (millisec)
332 **    // uint32_t     elaps_msec:10; //        Fractional elapsed (millisec)
333 **    // uint32_t     pflag     : 1; //        'pkts' requires multiplier?
334 **    // uint32_t     is_tcp    : 1; //        1 if flow is TCP; 0 otherwise
335 **    // uint32_t     padding   : 2; //        padding/reserved
336 **    // uint32_t     prot_flags: 8; //        is_tcp==0: IP protocol
337 **                                   //        is_tcp==1 &&
338 **                                   //          EXPANDED==0:TCPflags/All pkts
339 **                                   //          EXPANDED==1:TCPflags/1st pkt
340 **
341 **    uint16_t      application;     // 24-25  Type of traffic
342 **
343 **    uint8_t       tcp_state;       // 26     TCP state machine info
344 **    uint8_t       rest_flags;      // 27     is_tcp==0: Flow's reported flags
345 **                                   //        is_tcp==1 &&
346 **                                   //          EXPANDED==0:Empty
347 **                                   //          EXPANDED==1:TCPflags/!1st pkt
348 **
349 **
350 **  28 bytes on disk.
351 */
352 
353 #define RECLEN_RWAUGMENTED_V1 28
354 #define RECLEN_RWAUGMENTED_V2 28
355 #define RECLEN_RWAUGMENTED_V3 28
356 
357 
358 /*
359  *    Byte swap the RWAUGMENTED v1 record 'ar' in place.
360  */
361 #define augmentedioRecordSwap_V1(ar)                            \
362     {                                                           \
363         SWAP_DATA32((ar) +  0);   /* sIP */                     \
364         SWAP_DATA32((ar) +  4);   /* dIP */                     \
365         SWAP_DATA16((ar) +  8);   /* sPort */                   \
366         SWAP_DATA16((ar) + 10);   /* dPort */                   \
367         SWAP_DATA32((ar) + 12);   /* pkts_stime */              \
368         SWAP_DATA32((ar) + 16);   /* bbe */                     \
369         SWAP_DATA32((ar) + 20);   /* msec_flags */              \
370         SWAP_DATA16((ar) + 24);   /* application */             \
371         /* Two single bytes: (26)tcp_state, (27)rest_flags */   \
372     }
373 
374 
375 /*
376  *  Unpack the array of bytes 'ar' into a record 'rwrec'
377  */
378 static int
augmentedioRecordUnpack_V1(skstream_t * stream,rwGenericRec_V5 * rwrec,uint8_t * ar)379 augmentedioRecordUnpack_V1(
380     skstream_t         *stream,
381     rwGenericRec_V5    *rwrec,
382     uint8_t            *ar)
383 {
384     uint32_t msec_flags;
385     uint8_t is_tcp, prot_flags;
386 
387     /* swap if required */
388     if (stream->swapFlag) {
389         augmentedioRecordSwap_V1(ar);
390     }
391 
392     /* sIP, dIP, sPort, dPort */
393     rwRecMemSetSIPv4(rwrec, &ar[0]);
394     rwRecMemSetDIPv4(rwrec, &ar[4]);
395     rwRecMemSetSPort(rwrec, &ar[8]);
396     rwRecMemSetDPort(rwrec, &ar[10]);
397 
398     /* msec times, proto or flags */
399     memcpy(&msec_flags, &ar[20], 4);
400 
401     /* application */
402     rwRecMemSetApplication(rwrec, &ar[24]);
403 
404     /* sTime, pkts, bytes, elapsed, proto, tcp-flags, bpp */
405     rwpackUnpackTimeBytesPktsFlags(rwrec, stream->hdr_starttime,
406                                    (uint32_t*)&ar[12], (uint32_t*)&ar[16],
407                                    &msec_flags);
408 
409     /* extra TCP information */
410     is_tcp = (uint8_t)GET_MASKED_BITS(msec_flags, 10, 1);
411     prot_flags = (uint8_t)GET_MASKED_BITS(msec_flags, 0, 8);
412     rwpackUnpackProtoFlags(rwrec, is_tcp, prot_flags, ar[26], ar[27]);
413 
414     /* sensor, flow_type from file name/header */
415     rwRecSetSensor(rwrec, stream->hdr_sensor);
416     rwRecSetFlowType(rwrec, stream->hdr_flowtype);
417 
418     return SKSTREAM_OK;
419 }
420 
421 
422 /*
423  *  Pack the record 'rwrec' into an array of bytes 'ar'
424  */
425 static int
augmentedioRecordPack_V1(skstream_t * stream,const rwGenericRec_V5 * rwrec,uint8_t * ar)426 augmentedioRecordPack_V1(
427     skstream_t             *stream,
428     const rwGenericRec_V5  *rwrec,
429     uint8_t                *ar)
430 {
431     int rv = SKSTREAM_OK; /* return value */
432     uint32_t msec_flags;
433     uint8_t is_tcp, prot_flags;
434 
435     /* sTime, pkts, bytes, elapsed, proto, tcp-flags, bpp */
436     rv = rwpackPackTimeBytesPktsFlags((uint32_t*)&ar[12], (uint32_t*)&ar[16],
437                                       &msec_flags,
438                                       rwrec, stream->hdr_starttime);
439     if (rv) {
440         return rv;
441     }
442 
443     rwpackPackProtoFlags(&is_tcp, &prot_flags, &ar[26], &ar[27], rwrec);
444 
445     /* msec_flags: sTime_msec:10; elaps_msec:10; pflag:1;
446      *             is_tcp:1; pad:2; prot_flags:8; */
447     /* overwrite the least significant 11 bits */
448     msec_flags = ((msec_flags & (MASKARRAY_21 << 11))
449                   | (is_tcp ? (1 << 10) : 0)
450                   | prot_flags);
451 
452     /* sIP, dIP, sPort, dPort */
453     rwRecMemGetSIPv4(rwrec, &ar[0]);
454     rwRecMemGetDIPv4(rwrec, &ar[4]);
455     rwRecMemGetSPort(rwrec, &ar[8]);
456     rwRecMemGetDPort(rwrec, &ar[10]);
457 
458     /* msec_flags */
459     memcpy(&ar[20], &msec_flags, 4);
460 
461     /* application */
462     rwRecMemGetApplication(rwrec, &ar[24]);
463 
464     /* swap if required */
465     if (stream->swapFlag) {
466         augmentedioRecordSwap_V1(ar);
467     }
468 
469     return SKSTREAM_OK;
470 }
471 
472 
473 /* ********************************************************************* */
474 
475 /*
476  *  Return length of record of specified version, or 0 if no such
477  *  version exists.  See skstream_priv.h for details.
478  */
479 uint16_t
augmentedioGetRecLen(sk_file_version_t vers)480 augmentedioGetRecLen(
481     sk_file_version_t   vers)
482 {
483     switch (vers) {
484       case 1:
485         return RECLEN_RWAUGMENTED_V1;
486       case 2:
487         return RECLEN_RWAUGMENTED_V2;
488       case 3:
489         return RECLEN_RWAUGMENTED_V3;
490       case 4:
491         return RECLEN_RWAUGMENTED_V4;
492       case 5:
493         return RECLEN_RWAUGMENTED_V5;
494       default:
495         return 0;
496     }
497 }
498 
499 
500 /*
501  *  status = augmentedioPrepare(&stream);
502  *
503  *    Sets the record version to the default if it is unspecified,
504  *    checks that the record format supports the requested record
505  *    version, sets the record length, and sets the pack and unpack
506  *    functions for this record format and version.
507  */
508 int
augmentedioPrepare(skstream_t * stream)509 augmentedioPrepare(
510     skstream_t         *stream)
511 {
512 #define FILE_FORMAT "FT_RWAUGMENTED"
513     sk_file_header_t *hdr = stream->silk_hdr;
514     int rv = SKSTREAM_OK; /* return value */
515 
516     assert(skHeaderGetFileFormat(hdr) == FT_RWAUGMENTED);
517 
518     /* Set version if none was selected by caller */
519     if ((stream->io_mode == SK_IO_WRITE)
520         && (skHeaderGetRecordVersion(hdr) == SK_RECORD_VERSION_ANY))
521     {
522         skHeaderSetRecordVersion(hdr, DEFAULT_RECORD_VERSION);
523     }
524 
525     /* version check; set values based on version */
526     switch (skHeaderGetRecordVersion(hdr)) {
527       case 5:
528         stream->rwUnpackFn = &augmentedioRecordUnpack_V5;
529         stream->rwPackFn   = &augmentedioRecordPack_V5;
530         break;
531       case 4:
532         stream->rwUnpackFn = &augmentedioRecordUnpack_V4;
533         stream->rwPackFn   = &augmentedioRecordPack_V4;
534         break;
535       case 3:
536       case 2:
537       case 1:
538         /* V1 and V2 differ only in the padding of the header */
539         /* V2 and V3 differ only in that V3 supports compression on
540          * read and write; V2 supports compression only on read */
541         stream->rwUnpackFn = &augmentedioRecordUnpack_V1;
542         stream->rwPackFn   = &augmentedioRecordPack_V1;
543         break;
544       case 0:
545       default:
546         rv = SKSTREAM_ERR_UNSUPPORT_VERSION;
547         goto END;
548     }
549 
550     stream->recLen = augmentedioGetRecLen(skHeaderGetRecordVersion(hdr));
551 
552     /* verify lengths */
553     if (stream->recLen == 0) {
554         skAppPrintErr("Record length not set for %s version %u",
555                       FILE_FORMAT, (unsigned)skHeaderGetRecordVersion(hdr));
556         skAbort();
557     }
558     if (stream->recLen != skHeaderGetRecordLength(hdr)) {
559         if (0 == skHeaderGetRecordLength(hdr)) {
560             skHeaderSetRecordLength(hdr, stream->recLen);
561         } else {
562             skAppPrintErr(("Record length mismatch for %s version %u\n"
563                            "\tcode = %" PRIu16 " bytes;  header = %lu bytes"),
564                           FILE_FORMAT, (unsigned)skHeaderGetRecordVersion(hdr),
565                           stream->recLen,
566                           (unsigned long)skHeaderGetRecordLength(hdr));
567             skAbort();
568         }
569     }
570 
571   END:
572     return rv;
573 }
574 
575 
576 /*
577 ** Local Variables:
578 ** mode:c
579 ** indent-tabs-mode:nil
580 ** c-basic-offset:4
581 ** End:
582 */
583