1 /*
2 ** Copyright (C) 2007-2020 by Carnegie Mellon University.
3 **
4 ** @OPENSOURCE_LICENSE_START@
5 ** See license information in ../../LICENSE.txt
6 ** @OPENSOURCE_LICENSE_END@
7 */
8 
9 /*
10  *  skipfix.c
11  *
12  *    This file and ipfixsource.c are tightly coupled, and together
13  *    they read IPFIX records and convert them to SiLK flow records.
14  *
15  *    This file primarly handles the conversion, and it is where the
16  *    reading functions exist.
17  *
18  *    The ipfixsource.c file is primary about setting up and tearing
19  *    down the data structures used when processing IPFIX.
20  */
21 
22 #define SKIPFIX_SOURCE 1
23 #include <silk/silk.h>
24 
25 RCSIDENT("$SiLK: skipfix.c 7af5eab585e4 2020-04-15 15:56:48Z mthomas $");
26 
27 #include "ipfixsource.h"
28 #include <silk/skipaddr.h>
29 #include <silk/skthread.h>
30 
31 #ifdef  SKIPFIX_TRACE_LEVEL
32 #define TRACEMSG_LEVEL SKIPFIX_TRACE_LEVEL
33 #endif
34 #define TRACEMSG(lvl, msg)      TRACEMSG_TO_TRACEMSGLVL(lvl, msg)
35 #include <silk/sktracemsg.h>
36 
37 
38 /* LOCAL DEFINES AND TYPEDEFS */
39 
40 /*
41  *    Whether to process the subTemplateList element of the Tombstone
42  *    record ski_tombstone_t.
43  */
44 #ifndef SKIPFIX_ENABLE_TOMBSTONE_TIMES
45 #  define SKIPFIX_ENABLE_TOMBSTONE_TIMES 1
46 #endif
47 
48 /*
49  *    A context is added to incoming templates to assist when decoding
50  *    records.  The context is a 32-bit bitmap, and the following
51  *    determines whether to allocate the bitmap or to use the pointer
52  *    itself and cast the pointer to a uintptr_t.
53  */
54 #ifndef SKIPFIX_ALLOCATE_BITMAP
55 #  if SK_SIZEOF_UINTPTR_T >= 4
56 #    define SKIPFIX_ALLOCATE_BITMAP 0
57 #  else
58 #    define SKIPFIX_ALLOCATE_BITMAP 1
59 #  endif
60 #endif
61 
62 #if SKIPFIX_ALLOCATE_BITMAP
63 #  define BMAP_TYPE  uint32_t
64 #  define BMAP_PRI   "%#010" PRIx32
65 #  define BMAP_TMPL_CTX_GET(m_template)                 \
66     ((fbTemplateGetContext(m_template) != NULL)         \
67      ? *((BMAP_TYPE *)fbTemplateGetContext(m_template)) \
68      : 0)
69 #  define BMAP_TMPL_CTX_SET(m_ctx, m_free_fn, m_bmap)                   \
70     do {                                                                \
71         BMAP_TYPE *bmapp = (BMAP_TYPE *)malloc(sizeof(BMAP_TYPE));      \
72         if (bmapp) {                                                    \
73             *bmapp = (m_bmap);                                          \
74             *(m_ctx) = bmapp;                                           \
75             *(m_free_fn) = free;                                        \
76         }                                                               \
77     } while(0)
78 #else
79 #  define BMAP_TYPE  uintptr_t
80 #  ifdef PRIxPTR
81 #    define BMAP_PRI "%#010" PRIxPTR
82 #  elif SK_SIZEOF_UINTPTR_T > 4
83 #    define BMAP_PRI "%#010" PRIx64
84 #  else
85 #    define BMAP_PRI "%#010" PRIx32
86 #  endif
87 #  define BMAP_TMPL_CTX_GET(m_template)         \
88     (BMAP_TYPE)fbTemplateGetContext(m_template)
89 #  define BMAP_TMPL_CTX_SET(m_ctx, m_free_fn, m_bmap)   \
90     do {                                                \
91         *(m_ctx) = (void*)(m_bmap);                     \
92         *(m_free_fn) = NULL;                            \
93     } while(0)
94 #endif  /* SKIPFIX_ALLOCATE_BITMAP */
95 
96 
97 /*
98  *    If 'm_val' is greater then 'm_max', return 'm_max'.  Otherwise,
99  *    return 'm_val' masked by 'm_max'.  (m_max & m_val).
100  */
101 #define CLAMP_VAL(m_val, m_max)                                 \
102     (((m_val) > (m_max)) ? (m_max) : ((m_max) & (m_val)))
103 #define CLAMP_VAL16(m_val)   CLAMP_VAL((m_val), UINT16_MAX)
104 #define CLAMP_VAL32(m_val)   CLAMP_VAL((m_val), UINT32_MAX)
105 
106 /* One more than UINT32_MAX */
107 #define ROLLOVER32 ((intmax_t)UINT32_MAX + 1)
108 
109 /*
110  *    For NetFlow V9, when the absolute value of the magnitude of the
111  *    difference between the sysUpTime and the flowStartSysUpTime is
112  *    greater than this value (in milliseconds), assume one of the
113  *    values has rolled over.
114  */
115 #define MAXIMUM_FLOW_TIME_DEVIATION  ((intmax_t)INT32_MAX)
116 
117 
118 /* Values for the flowEndReason. this first set is defined by the
119  * IPFIX spec */
120 #define SKI_END_IDLE            1
121 #define SKI_END_ACTIVE          2
122 #define SKI_END_CLOSED          3
123 #define SKI_END_FORCED          4
124 #define SKI_END_RESOURCE        5
125 
126 /* SiLK will ignore flows with a flowEndReason of
127  * SKI_END_YAF_INTERMEDIATE_FLOW */
128 #define SKI_END_YAF_INTERMEDIATE_FLOW 0x1F
129 
130 /* Mask for the values of flowEndReason: want to ignore the next bit */
131 #define SKI_END_MASK            0x1f
132 
133 /* Bits from flowEndReason: whether flow is a continuation */
134 #define SKI_END_ISCONT          0x80
135 
136 /* Bits from flowAttributes */
137 #define SKI_FLOW_ATTRIBUTE_UNIFORM_PACKET_SIZE 0x01
138 
139 
140 /*  **********  Template Bitmap to Record Type  **********  */
141 
142 /*
143  *    Each IPFIX Template is examined by skiTemplateCallbackCtx() when
144  *    it is initially received to determine how to process data
145  *    represented by the template.
146  *
147  *    This up-front processing should reduce the overhead of needing
148  *    to examining the content of every record, but at the expenses of
149  *    looking at elements or templates which may never be used.
150  *    Overall this should be a benefit as long as the number of
151  *    records received is much higher than the number of templates
152  *    received (in the TCP case, the templates are only sent once).
153  *
154  *    When skiTemplateCallbackCtx() examines a Data Template (that is,
155  *    a non-Options Template), it may determine that a specialized
156  *    function should be used to read the data.  The lower 3 bits of
157  *    the bitmap detmine whether it was able to determine this, as
158  *    explained in the next paragraphs:
159  *
160  *    xx1. If the least significant bit is high, the general purpose
161  *    ski_fixrec_next() function is used to read the data, and the
162  *    other 31 bits are an indication what the template contains.
163  *
164  *    x10. If the two LSB are 10, the data looks like NetFlow v9 and
165  *    the ski_nf9rec_next() function is used to read the data.  See
166  *    the note below.
167  *
168  *    100. If the three LSB are 100, the data looks like YAF or SiLK
169  *    data and the ski_yafrec_next() function is used to read the
170  *    data.  See the note below.
171  *
172  *    NOTE: For the NetFlow v9 and YAF/SiLK data, the lower 16 bits of
173  *    the bitmap represent the template ID that should be used to
174  *    process the data.  The upper 16 bits provide other bits to
175  *    represent what may be present in the template.
176  *
177  *    The bitmap is used to determine the ski_rectype_t value that is
178  *    returned by ski_rectype_next().
179  */
180 
181 #define BMAP_RECTYPE_MASK       0x00000007
182 
183 #define BMAP_RECTYPE_YAFREC     0x00000004
184 
185 #define BMAP_RECTYPE_NF9REC     0x00000002
186 
187 #define BMAP_RECTYPE_FIXREC     0x00000001
188 
189 
190 /*
191  *  **********  Determining What IEs a Template Contains  **********
192  *
193  *    The following TMPL_BIT_ieFoo macros are for setting and getting
194  *    the bit associated with the ieFoo element in the bitmap
195  *    associated with the template.
196  */
197 #define TMPL_BIT_flowStartMilliseconds          (UINT32_C(1) <<  1)
198 #define TMPL_BIT_flowEndMilliseconds            (UINT32_C(1) <<  2)
199 
200 #define TMPL_BIT_systemInitTimeMilliseconds     (UINT32_C(1) <<  3)
201 #define TMPL_BIT_flowStartSysUpTime             (UINT32_C(1) <<  4)
202 
203 #define TMPL_BIT_flowStartMicroseconds          (UINT32_C(1) <<  5)
204 #define TMPL_BIT_flowEndMicroseconds            (UINT32_C(1) <<  6)
205 
206 #define TMPL_BIT_flowStartNanoseconds           (UINT32_C(1) <<  7)
207 #define TMPL_BIT_flowEndNanoseconds             (UINT32_C(1) <<  8)
208 
209 #define TMPL_BIT_flowStartSeconds               (UINT32_C(1) <<  9)
210 #define TMPL_BIT_flowEndSeconds                 (UINT32_C(1) << 10)
211 
212 #define TMPL_BIT_flowStartDeltaMicroseconds     (UINT32_C(1) << 11)
213 #define TMPL_BIT_flowEndDeltaMicroseconds       (UINT32_C(1) << 12)
214 
215 #define TMPL_BIT_flowDurationMicroseconds       (UINT32_C(1) << 13)
216 #define TMPL_BIT_flowDurationMilliseconds       (UINT32_C(1) << 14)
217 
218 /*  either sourceIPv4Address or destinationIPv4Address */
219 #define TMPL_BIT_sourceIPv4Address              (UINT32_C(1) << 15)
220 /*  either sourceIPv6Address or destinationIPv6Address */
221 #define TMPL_BIT_sourceIPv6Address              (UINT32_C(1) << 16)
222 
223 #define TMPL_BIT_firewallEvent                  (UINT32_C(1) << 17)
224 #define TMPL_BIT_NF_F_FW_EVENT                  (UINT32_C(1) << 18)
225 #define TMPL_BIT_NF_F_FW_EXT_EVENT              (UINT32_C(1) << 19)
226 
227 #define TMPL_BIT_collectionTimeMilliseconds     (UINT32_C(1) << 20)
228 #define TMPL_BIT_observationTimeMicroseconds    (UINT32_C(1) << 21)
229 #define TMPL_BIT_observationTimeMilliseconds    (UINT32_C(1) << 22)
230 #define TMPL_BIT_observationTimeNanoseconds     (UINT32_C(1) << 23)
231 #define TMPL_BIT_observationTimeSeconds         (UINT32_C(1) << 24)
232 
233 /*  either icmpTypeCodeIPv4 or icmpTypeCodeIPv6 */
234 #define TMPL_BIT_icmpTypeCodeIPv4               (UINT32_C(1) << 25)
235 /*  at least one of icmpTypeIPv4, icmpCodeIPv4, icmpTypeIPv6, icmpCodeIPv6 */
236 #define TMPL_BIT_icmpTypeIPv4                   (UINT32_C(1) << 26)
237 
238 #define TMPL_BIT_postVlanId                     (UINT32_C(1) << 27)
239 #define TMPL_BIT_reverseVlanId                  (UINT32_C(1) << 28)
240 
241 #define TMPL_BIT_reverseInitialTCPFlags         (UINT32_C(1) << 29)
242 #define TMPL_BIT_reverseTcpControlBits          (UINT32_C(1) << 30)
243 
244 
245 /*
246  *    The following are not stored on the bitmap that is set as the
247  *    template's context, but they are used when the template is
248  *    examined initially.
249  */
250 
251 /*  octetDeltaCount */
252 #define TMPL_BIT_octetDeltaCount                (UINT64_C(1) << 32)
253 /*  packetDeltaCount */
254 #define TMPL_BIT_packetDeltaCount               (UINT64_C(1) << 33)
255 /*  octetTotalCount */
256 #define TMPL_BIT_octetTotalCount                (UINT64_C(1) << 34)
257 /*  packetTotalCount */
258 #define TMPL_BIT_packetTotalCount               (UINT64_C(1) << 35)
259 /*  either initiatorOctets or initiatorPackets */
260 #define TMPL_BIT_initiatorOctets                (UINT64_C(1) << 36)
261 /*  either responderOctets or responderPackets */
262 #define TMPL_BIT_responderOctets                (UINT64_C(1) << 37)
263 /*  either reverseOctetDeltaCount or reversePacketDeltaCount */
264 #define TMPL_BIT_reverseOctetDeltaCount         (UINT64_C(1) << 38)
265 #define TMPL_BIT_initialTCPFlags                (UINT64_C(1) << 39)
266 #define TMPL_BIT_reverseFlowDeltaMilliseconds   (UINT64_C(1) << 40)
267 #define TMPL_BIT_subTemplateMultiList           (UINT64_C(1) << 41)
268 /*  either postOctetDeltaCount or postPacketDeltaCount */
269 #define TMPL_BIT_postOctetDeltaCount            (UINT64_C(1) << 42)
270 /*  either postOctetTotalCount or postPacketTotalCount */
271 #define TMPL_BIT_postOctetTotalCount            (UINT64_C(1) << 43)
272 /*  certToolId (YAF 2.11) */
273 #define TMPL_BIT_certToolId                     (UINT64_C(1) << 44)
274 /*  exportingProcessId and observationTimeSeconds are the
275  *  tombstone_access values for YAF 2.10 */
276 #define TMPL_BIT_exportingProcessId             (UINT64_C(1) << 45)
277 
278 /* The following are only checked in options templates, so the bit
279  * position here can repeat those above */
280 /* both IE49,IE50 (samplerMode, samplerRandomInterval) are present */
281 #define TMPL_BIT_samplerMode                    (UINT32_C(1) <<  1)
282 /* both IE35,IE34 (samplingAlgorithm, samplingInterval) are present */
283 #define TMPL_BIT_samplingAlgorithm              (UINT32_C(1) <<  2)
284 #define TMPL_BIT_flowTableFlushEventCount       (UINT32_C(1) <<  3)
285 #define TMPL_BIT_flowTablePeakCount             (UINT32_C(1) <<  4)
286 #define TMPL_BIT_tombstoneId                    (UINT32_C(1) <<  5)
287 
288 /*
289  *    Groupings of various bits.
290  */
291 
292 #define TMPL_MASK_GAUNTLET_OF_TIME              \
293     (TMPL_BIT_collectionTimeMilliseconds   |    \
294      TMPL_BIT_flowDurationMicroseconds     |    \
295      TMPL_BIT_flowDurationMilliseconds     |    \
296      TMPL_BIT_flowEndDeltaMicroseconds     |    \
297      TMPL_BIT_flowEndMicroseconds          |    \
298      TMPL_BIT_flowEndMilliseconds          |    \
299      TMPL_BIT_flowEndNanoseconds           |    \
300      TMPL_BIT_flowEndSeconds               |    \
301      TMPL_BIT_flowStartDeltaMicroseconds   |    \
302      TMPL_BIT_flowStartMicroseconds        |    \
303      TMPL_BIT_flowStartMilliseconds        |    \
304      TMPL_BIT_flowStartNanoseconds         |    \
305      TMPL_BIT_flowStartSeconds             |    \
306      TMPL_BIT_flowStartSysUpTime           |    \
307      TMPL_BIT_observationTimeMicroseconds  |    \
308      TMPL_BIT_observationTimeMilliseconds  |    \
309      TMPL_BIT_observationTimeNanoseconds   |    \
310      TMPL_BIT_observationTimeSeconds       |    \
311      TMPL_BIT_systemInitTimeMilliseconds)
312 
313 #define TMPL_MASK_IPADDRESS                     \
314     (TMPL_BIT_sourceIPv4Address |               \
315      TMPL_BIT_sourceIPv6Address)
316 
317 #define TMPL_MASK_VOLUME_YAF                    \
318     (TMPL_BIT_octetTotalCount  |                \
319      TMPL_BIT_packetTotalCount |                \
320      TMPL_BIT_octetDeltaCount  |                \
321      TMPL_BIT_packetDeltaCount)
322 
323 #define TMPL_MASK_TIME_MILLI_YAF                \
324     (TMPL_BIT_flowStartMilliseconds |           \
325      TMPL_BIT_flowEndMilliseconds)
326 
327 #define TMPL_MASK_VOLUME_NF9                    \
328     (TMPL_BIT_octetDeltaCount     |             \
329      TMPL_BIT_packetDeltaCount    |             \
330      TMPL_BIT_octetTotalCount     |             \
331      TMPL_BIT_packetTotalCount    |             \
332      TMPL_BIT_initiatorOctets     |             \
333      TMPL_BIT_responderOctets     |             \
334      TMPL_BIT_postOctetDeltaCount |             \
335      TMPL_BIT_postOctetTotalCount)
336 
337 #define TMPL_MASK_TIME_SYSUP                    \
338     (TMPL_BIT_systemInitTimeMilliseconds |      \
339      TMPL_BIT_flowStartSysUpTime)
340 
341 #define TMPL_MASK_TIME_MILLI_NF9                \
342     (TMPL_BIT_flowStartMilliseconds |           \
343      TMPL_BIT_observationTimeMilliseconds)
344 
345 #define TMPL_MASK_TIME_NF9                      \
346     (TMPL_MASK_TIME_SYSUP |                     \
347      TMPL_MASK_TIME_MILLI_NF9)
348 
349 #define TMPL_MASK_YAFREC                        \
350     (TMPL_MASK_IPADDRESS |                      \
351      TMPL_MASK_VOLUME_YAF |                     \
352      TMPL_MASK_TIME_MILLI_YAF |                 \
353      TMPL_BIT_reverseOctetDeltaCount |          \
354      TMPL_BIT_initialTCPFlags |                 \
355      TMPL_BIT_icmpTypeCodeIPv4 |                \
356      TMPL_BIT_reverseVlanId |                   \
357      TMPL_BIT_reverseInitialTCPFlags |          \
358      TMPL_BIT_reverseTcpControlBits |           \
359      TMPL_BIT_reverseFlowDeltaMilliseconds |    \
360      TMPL_BIT_subTemplateMultiList)
361 
362 #define TMPL_MASK_NF9REC                        \
363     (TMPL_MASK_IPADDRESS |                      \
364      TMPL_MASK_VOLUME_NF9 |                     \
365      TMPL_MASK_TIME_NF9 |                       \
366      TMPL_BIT_icmpTypeCodeIPv4 |                \
367      TMPL_BIT_icmpTypeIPv4 |                    \
368      TMPL_BIT_postVlanId |                      \
369      TMPL_BIT_firewallEvent |                   \
370      TMPL_BIT_NF_F_FW_EVENT |                   \
371      TMPL_BIT_NF_F_FW_EXT_EVENT)
372 
373 #define ASSERT_IE_NAME_IS(aini_ie, aini_name)                           \
374     assert(TMPL_BIT_ ## aini_name                                       \
375            && 0==strcmp((aini_ie)->ref.canon->ref.name, #aini_name))
376 
377 
378 /*
379  *  **********  "Give Me Everything" Template for Import  **********
380  *
381  *    This is the template and a matching struct used for reading
382  *    generic flow records.  The template and struct are used by
383  *    ski_fixrec_next() when reading data.
384  *
385  *    The template contains all the IPFIX fields that SiLK supports
386  *    when importing data.
387  *
388  *    The type for these records is SKI_RECTYPE_FIXREC
389  */
390 
391 #define SKI_FIXREC_TID          0xAFEB
392 
393 #define SKI_FIXREC_PADDING  2
394 
395 static fbInfoElementSpec_t ski_fixrec_spec[] = {
396     /* Ports, Protocol */
397     { (char*)"sourceTransportPort",                2, 0 },
398     { (char*)"destinationTransportPort",           2, 0 },
399     { (char*)"protocolIdentifier",                 1, 0 },
400     /* TCP Flags (reverse values below) */
401     { (char*)"tcpControlBits",                     1, 0 },
402     { (char*)"initialTCPFlags",                    1, 0 },
403     { (char*)"unionTCPFlags",                      1, 0 },
404     /* Router interfaces */
405     { (char*)"ingressInterface",                   4, 0 },
406     { (char*)"egressInterface",                    4, 0 },
407     /* Volume, as Delta (reverse values below) */
408     { (char*)"packetDeltaCount",                   8, 0 },
409     { (char*)"octetDeltaCount",                    8, 0 },
410     /* Volume, as Total (reverse values below) */
411     { (char*)"packetTotalCount",                   8, 0 },
412     { (char*)"octetTotalCount",                    8, 0 },
413     /* Volume, yet more */
414     { (char*)"initiatorPackets",                   8, 0 },
415     { (char*)"initiatorOctets",                    8, 0 },
416     { (char*)"responderPackets",                   8, 0 },
417     { (char*)"responderOctets",                    8, 0 },
418     /* Flow attributes (reverse value below) */
419     { (char*)"flowAttributes",                     2, 0 },
420     /* SiLK Fields */
421     { (char*)"silkAppLabel",                       2, 0 },
422     { (char*)"silkFlowSensor",                     2, 0 },
423     { (char*)"silkFlowType",                       1, 0 },
424     { (char*)"silkTCPState",                       1, 0 },
425     /* Vlan IDs */
426     { (char*)"vlanId",                             2, 0 },
427     { (char*)"postVlanId",                         2, 0 },
428     /* Firewall events */
429     { (char*)"firewallEvent",                      1, 0 },
430     { (char*)"NF_F_FW_EVENT",                      1, 0 },
431     { (char*)"NF_F_FW_EXT_EVENT",                  2, 0 },
432     /* ICMP */
433     { (char*)"icmpTypeCodeIPv4",                   2, 0 },
434     { (char*)"icmpTypeIPv4",                       1, 0 },
435     { (char*)"icmpCodeIPv4",                       1, 0 },
436     { (char*)"icmpTypeCodeIPv6",                   2, 0 },
437     { (char*)"icmpTypeIPv6",                       1, 0 },
438     { (char*)"icmpCodeIPv6",                       1, 0 },
439     /* Millisecond start and end (epoch) (native time) */
440     { (char*)"flowStartMilliseconds",              8, 0 },
441     { (char*)"flowEndMilliseconds",                8, 0 },
442     /* SysUpTime, used to handle Netflow v9 SysUpTime offset times */
443     { (char*)"systemInitTimeMilliseconds",         8, 0 },
444     { (char*)"flowStartSysUpTime",                 4, 0 },
445     { (char*)"flowEndSysUpTime",                   4, 0 },
446     /* Microsecond start and end (RFC1305-style) */
447     { (char*)"flowStartMicroseconds",              8, 0 },
448     { (char*)"flowEndMicroseconds",                8, 0 },
449     /* Nanosecond start and end (RFC1305-style) */
450     { (char*)"flowStartNanoseconds",               8, 0 },
451     { (char*)"flowEndNanoseconds",                 8, 0 },
452     /* Second start and end */
453     { (char*)"flowStartSeconds",                   4, 0 },
454     { (char*)"flowEndSeconds",                     4, 0 },
455     /* Microsecond delta start and end */
456     { (char*)"flowStartDeltaMicroseconds",         4, 0 },
457     { (char*)"flowEndDeltaMicroseconds",           4, 0 },
458     /* Flow durations */
459     { (char*)"flowDurationMicroseconds",           4, 0 },
460     { (char*)"flowDurationMilliseconds",           4, 0 },
461     /* Collection time and Observation time */
462     { (char*)"collectionTimeMilliseconds",         8, 0 },
463     { (char*)"observationTimeMilliseconds",        8, 0 },
464     { (char*)"observationTimeMicroseconds",        8, 0 },
465     { (char*)"observationTimeNanoseconds",         8, 0 },
466     { (char*)"observationTimeSeconds",             4, 0 },
467     /* IPv4 Addresses */
468     { (char*)"sourceIPv4Address",                  4, 0 },
469     { (char*)"destinationIPv4Address",             4, 0 },
470     { (char*)"ipNextHopIPv4Address",               4, 0 },
471     /* IPv6 Addresses */
472     { (char*)"sourceIPv6Address",                 16, 0 },
473     { (char*)"destinationIPv6Address",            16, 0 },
474     { (char*)"ipNextHopIPv6Address",              16, 0 },
475     /* Volumes as flow leaves the router or middlebox */
476     { (char*)"postPacketDeltaCount",               8, 0 },
477     { (char*)"postOctetDeltaCount",                8, 0 },
478     { (char*)"postPacketTotalCount",               8, 0 },
479     { (char*)"postOctetTotalCount",                8, 0 },
480     /* End reason */
481     { (char*)"flowEndReason",                      1, 0 },
482     /* TCP Flags (reverse) */
483     { (char*)"reverseTcpControlBits",              1, 0 },
484     { (char*)"reverseInitialTCPFlags",             1, 0 },
485     { (char*)"reverseUnionTCPFlags",               1, 0 },
486     /* Initial packet roundtrip */
487     { (char*)"reverseFlowDeltaMilliseconds",       4, 0 },
488     /* Volume, as Delta (reverse) */
489     { (char*)"reversePacketDeltaCount",            8, 0 },
490     { (char*)"reverseOctetDeltaCount",             8, 0 },
491     /* Volume, as Total (reverse) */
492     { (char*)"reversePacketTotalCount",            8, 0 },
493     { (char*)"reverseOctetTotalCount",             8, 0 },
494     /* Vlan IDs (reverse) */
495     { (char*)"reverseVlanId",                      2, 0 },
496     { (char*)"reversePostVlanId",                  2, 0 },
497     /* Flow attributes (reverse) */
498     { (char*)"reverseFlowAttributes",              2, 0 },
499 #if SKI_FIXREC_PADDING != 0
500     { (char*)"paddingOctets",     SKI_FIXREC_PADDING, 0 },
501 #endif
502     { (char*)"subTemplateMultiList",               0, 0 },
503     FB_IESPEC_NULL
504 };
505 
506 typedef struct ski_fixrec_st {
507     uint16_t        sourceTransportPort;            /*   0-  1 */
508     uint16_t        destinationTransportPort;       /*   2-  3 */
509 
510     uint8_t         protocolIdentifier;             /*   4     */
511     uint8_t         tcpControlBits;                 /*   5     */
512     uint8_t         initialTCPFlags;                /*   6     */
513     uint8_t         unionTCPFlags;                  /*   7     */
514 
515     uint32_t        ingressInterface;               /*   8- 11 */
516     uint32_t        egressInterface;                /*  12- 15 */
517 
518     uint64_t        packetDeltaCount;               /*  16- 23 */
519     uint64_t        octetDeltaCount;                /*  24- 31 */
520 
521     uint64_t        packetTotalCount;               /*  32- 39 */
522     uint64_t        octetTotalCount;                /*  40- 47 */
523 
524     uint64_t        initiatorPackets;               /*  48- 55 */
525     uint64_t        initiatorOctets;                /*  56- 63 */
526 
527     uint64_t        responderPackets;               /*  64- 71 */
528     uint64_t        responderOctets;                /*  72- 79 */
529 
530     /* Flow attribute flags (reverse value below) */
531     uint16_t        flowAttributes;                 /*  80- 81 */
532 
533     /* SiLK valuee */
534     uint16_t        silkAppLabel;                   /*  82- 83 */
535     uint16_t        silkFlowSensor;                 /*  84- 85 */
536     uint8_t         silkFlowType;                   /*  86     */
537     uint8_t         silkTCPState;                   /*  87     */
538 
539     /* vlan IDs (reverse values below) */
540     uint16_t        vlanId;                         /*  88- 89 */
541     uint16_t        postVlanId;                     /*  90- 91 */
542 
543     /* Firewall events */
544     uint8_t         firewallEvent;                  /*  92     */
545     uint8_t         NF_F_FW_EVENT;                  /*  93     */
546     uint16_t        NF_F_FW_EXT_EVENT;              /*  94- 95 */
547 
548     /* ICMP */
549     uint16_t        icmpTypeCodeIPv4;               /*  96- 97 */
550     uint8_t         icmpTypeIPv4;                   /*  98     */
551     uint8_t         icmpCodeIPv4;                   /*  99     */
552     uint16_t        icmpTypeCodeIPv6;               /* 100-101 */
553     uint8_t         icmpTypeIPv6;                   /* 102     */
554     uint8_t         icmpCodeIPv6;                   /* 103     */
555 
556     /* Time can be represented in many different formats: */
557     uint64_t        flowStartMilliseconds;          /* 104-111 */
558     uint64_t        flowEndMilliseconds;            /* 112-119 */
559 
560     /* SysUpTime: used for flow{Start,End}SysUpTime calculations.
561      * Needed to support Netflow v9 in particular. */
562     uint64_t        systemInitTimeMilliseconds;     /* 120-127 */
563 
564     /* Start and end time as delta from the system init time.  Needed
565      * to support Netflow v9. */
566     uint32_t        flowStartSysUpTime;             /* 128-131 */
567     uint32_t        flowEndSysUpTime;               /* 132-135 */
568 
569     /* start time as NTP microseconds (RFC1305); may either have end
570      * Time in same format or as an flowDurationMicroseconds value. */
571     uint64_t        flowStartMicroseconds;          /* 136-143 */
572     uint64_t        flowEndMicroseconds;            /* 144-151 */
573 
574     /* start time as NTP nanoseconds (RFC1305) */
575     uint64_t        flowStartNanoseconds;           /* 152-159 */
576     uint64_t        flowEndNanoseconds;             /* 160-167 */
577 
578     /* start time and end times as seconds since UNIX epoch. no
579      * flowDuration field */
580     uint32_t        flowStartSeconds;               /* 168-171 */
581     uint32_t        flowEndSeconds;                 /* 172-175 */
582 
583     /* start time as delta (negative microsec offsets) from the export
584      * time; may either have end time in same format or a
585      * flowDurationMicroseconds value */
586     uint32_t        flowStartDeltaMicroseconds;     /* 176-179 */
587     uint32_t        flowEndDeltaMicroseconds;       /* 180-183 */
588 
589     /* elapsed time as either microsec or millisec.  used when the
590      * flowEnd time is not given. */
591     uint32_t        flowDurationMicroseconds;       /* 184-187 */
592     uint32_t        flowDurationMilliseconds;       /* 188-191 */
593 
594     /* Collection time and Observation time */
595     uint64_t        collectionTimeMilliseconds;     /* 192-199 */
596     uint64_t        observationTimeMilliseconds;    /* 200-207 */
597     uint64_t        observationTimeMicroseconds;    /* 208-215 */
598     uint64_t        observationTimeNanoseconds;     /* 216-223 */
599     uint32_t        observationTimeSeconds;         /* 224-227 */
600 
601     uint32_t        sourceIPv4Address;              /* 228-231 */
602     uint32_t        destinationIPv4Address;         /* 232-235 */
603     uint32_t        ipNextHopIPv4Address;           /* 236-239 */
604 
605     uint8_t         sourceIPv6Address[16];          /* 240-255 */
606     uint8_t         destinationIPv6Address[16];     /* 256-271 */
607     uint8_t         ipNextHopIPv6Address[16];       /* 272-287 */
608 
609     /* egress volume; used when ingress volume is 0 */
610     uint64_t        postPacketDeltaCount;           /* 288-295 */
611     uint64_t        postOctetDeltaCount;            /* 296-303 */
612     uint64_t        postPacketTotalCount;           /* 304-311 */
613     uint64_t        postOctetTotalCount;            /* 312-319 */
614 
615     uint8_t         flowEndReason;                  /* 320     */
616 
617     /* Flags for the reverse flow: */
618     uint8_t         reverseTcpControlBits;          /* 321     */
619     uint8_t         reverseInitialTCPFlags;         /* 322     */
620     uint8_t         reverseUnionTCPFlags;           /* 323     */
621 
622     /* start time of reverse flow, as millisec offset from start time
623      * of forward flow */
624     uint32_t        reverseFlowDeltaMilliseconds;   /* 324-327 */
625 
626     uint64_t        reversePacketDeltaCount;        /* 328-335 */
627     uint64_t        reverseOctetDeltaCount;         /* 336-343 */
628     uint64_t        reversePacketTotalCount;        /* 344-351 */
629     uint64_t        reverseOctetTotalCount;         /* 352-359 */
630 
631     /* vlan IDs (reverse) */
632     uint16_t        reverseVlanId;                  /* 360-361 */
633     uint16_t        reversePostVlanId;              /* 362-363 */
634 
635     /* Flow attribute flags (reverse) */
636     uint16_t        reverseFlowAttributes;          /* 364-365 */
637 
638     /* padding */
639 #if SKI_FIXREC_PADDING != 0
640     uint8_t         paddingOctets[SKI_FIXREC_PADDING];/* 366-367 */
641 #endif
642 
643     /* TCP flags from yaf (when it is run without --silk) */
644     fbSubTemplateMultiList_t stml;
645 } ski_fixrec_t;
646 
647 
648 /*
649  *  **********  YAF/SiLK Template for Import  **********
650  *
651  *    These are templates used for reading records we know are coming
652  *    from SiLK or YAF.  The templates and struct are used
653  *    by ski_yafrec_next() when reading data.
654  *
655  *    The type for these records is SKI_RECTYPE_YAFREC
656  */
657 
658 /*
659  *    There are several templates defined here.  The following
660  *    YAFREC_* macros determine which elements in the
661  *    ski_yafrec_spec[] are used.  For the template that uses the
662  *    elements, the correspong bits are set to high in the
663  *    SKI_YAFREC_TID below.
664  *
665  *    On SKI_YAFREC_TID, high bits are 2, 12, 13, 14
666  */
667 #define YAFREC_UNI          (1 <<  3)
668 #define YAFREC_BI           (1 <<  4)
669 
670 #define YAFREC_ONLY_IP4     (1 <<  5)
671 #define YAFREC_ONLY_IP6     (1 <<  6)
672 #define YAFREC_IP_BOTH      (1 <<  7)
673 
674 #define YAFREC_DELTA        (1 <<  8)
675 #define YAFREC_TOTAL        (1 <<  9)
676 
677 #define YAFREC_STML         (1 << 10)
678 
679 #define SKI_YAFREC_TID      0x7004
680 #if (SKI_YAFREC_TID & BMAP_RECTYPE_MASK) != BMAP_RECTYPE_YAFREC
681 #error "bad SKI_YAFREC_TID value"
682 #endif
683 
684 static fbInfoElementSpec_t ski_yafrec_spec[] = {
685     /* Ports, Protocol */
686     { (char*)"sourceTransportPort",                2, 0 },
687     { (char*)"destinationTransportPort",           2, 0 },
688     { (char*)"protocolIdentifier",                 1, 0 },
689     /* TCP Flags (reverse values below) */
690     { (char*)"tcpControlBits",                     1, 0 },
691     { (char*)"initialTCPFlags",                    1, 0 },
692     { (char*)"unionTCPFlags",                      1, 0 },
693     /* Router interfaces */
694     { (char*)"ingressInterface",                   4, 0 },
695     { (char*)"egressInterface",                    4, 0 },
696     /* Volume, as either Delta or Total */
697     { (char*)"packetDeltaCount",                   8, YAFREC_DELTA },
698     { (char*)"octetDeltaCount",                    8, YAFREC_DELTA },
699     { (char*)"packetTotalCount",                   8, YAFREC_TOTAL },
700     { (char*)"octetTotalCount",                    8, YAFREC_TOTAL },
701     /* Flow attributes (reverse value below) */
702     { (char*)"flowAttributes",                     2, 0 },
703     /* SiLK Fields */
704     { (char*)"silkAppLabel",                       2, 0 },
705     { (char*)"silkFlowSensor",                     2, 0 },
706     { (char*)"silkFlowType",                       1, 0 },
707     { (char*)"silkTCPState",                       1, 0 },
708     /* Millisecond start and end (epoch) (native time) */
709     { (char*)"flowStartMilliseconds",              8, 0 },
710     { (char*)"flowEndMilliseconds",                8, 0 },
711     /* Vlan IDs */
712     { (char*)"vlanId",                             2, 0 },
713     { (char*)"postVlanId",                         2, 0 },
714     /* ICMP, either IPv4 or IPv6 */
715     { (char*)"icmpTypeCodeIPv4",                   2, YAFREC_IP_BOTH },
716     { (char*)"icmpTypeCodeIPv4",                   2, YAFREC_ONLY_IP4 },
717     { (char*)"icmpTypeCodeIPv6",                   2, YAFREC_ONLY_IP6 },
718     /* End reason */
719     { (char*)"flowEndReason",                      1, 0 },
720     /* TOS */
721     { (char*)"ipClassOfService",                   1, 0 },
722     /* IPv4 Addresses; if no IPv4 addresses, add 16 bytes of padding */
723     { (char*)"paddingOctets",                     16, YAFREC_ONLY_IP6 },
724     { (char*)"sourceIPv4Address",                  4, YAFREC_IP_BOTH },
725     { (char*)"destinationIPv4Address",             4, YAFREC_IP_BOTH },
726     { (char*)"ipNextHopIPv4Address",               4, YAFREC_IP_BOTH },
727     { (char*)"sourceIPv4Address",                  4, YAFREC_ONLY_IP4 },
728     { (char*)"destinationIPv4Address",             4, YAFREC_ONLY_IP4 },
729     { (char*)"ipNextHopIPv4Address",               4, YAFREC_ONLY_IP4 },
730     /* If both IPv4 and IPv6 addresses are present, add 4 bytes of
731      * padding.  If this is an IPv4 bi-flow, add 4 bytes of padding
732      * and 48 bytes of padding to skip the IPv6 addresess.  If an
733      * IPv4-uniflow, this is the end of the record, and pad to the
734      * end. */
735     { (char*)"paddingOctets",                      4, YAFREC_IP_BOTH },
736     { (char*)"paddingOctets",                     52, (YAFREC_ONLY_IP4
737                                                        | YAFREC_BI) },
738     { (char*)"paddingOctets",                     84, (YAFREC_ONLY_IP4
739                                                        | YAFREC_UNI) },
740     /* Ingore the IPv6 addresses for an IPv4 bi-flow */
741     /* IPv6 Addresses */
742     { (char*)"sourceIPv6Address",                 16, YAFREC_IP_BOTH },
743     { (char*)"destinationIPv6Address",            16, YAFREC_IP_BOTH },
744     { (char*)"ipNextHopIPv6Address",              16, YAFREC_IP_BOTH },
745     { (char*)"sourceIPv6Address",                 16, YAFREC_ONLY_IP6 },
746     { (char*)"destinationIPv6Address",            16, YAFREC_ONLY_IP6 },
747     { (char*)"ipNextHopIPv6Address",              16, YAFREC_ONLY_IP6 },
748     /* Ignore the the reverse elements for a uniflow record that is
749      * either IPv6 only or has both IPv4 and IPv6 IPs. */
750     { (char*)"paddingOctets",                     32, (YAFREC_IP_BOTH
751                                                        | YAFREC_UNI) },
752     { (char*)"paddingOctets",                     32, (YAFREC_ONLY_IP6
753                                                        | YAFREC_UNI) },
754     /* Volume (reverse), as either Delta or Total */
755     { (char*)"reversePacketDeltaCount",            8, (YAFREC_DELTA
756                                                        | YAFREC_BI) },
757     { (char*)"reverseOctetDeltaCount",             8, (YAFREC_DELTA
758                                                        | YAFREC_BI) },
759     { (char*)"reversePacketTotalCount",            8, (YAFREC_TOTAL
760                                                        | YAFREC_BI) },
761     { (char*)"reverseOctetTotalCount",             8, (YAFREC_TOTAL
762                                                        | YAFREC_BI) },
763     /* Initial packet roundtrip */
764     { (char*)"reverseFlowDeltaMilliseconds",       4, YAFREC_BI },
765     /* Vlan IDs (reverse) */
766     { (char*)"reverseVlanId",                      2, YAFREC_BI },
767     { (char*)"reversePostVlanId",                  2, YAFREC_BI },
768     /* Flow attributes (reverse) */
769     { (char*)"reverseFlowAttributes",              2, YAFREC_BI },
770     /* TCP Flags (reverse) */
771     { (char*)"reverseTcpControlBits",              1, YAFREC_BI },
772     { (char*)"reverseInitialTCPFlags",             1, YAFREC_BI },
773     { (char*)"reverseUnionTCPFlags",               1, YAFREC_BI },
774     { (char*)"reverseIpClassOfService",            1, YAFREC_BI },
775     { (char*)"paddingOctets",                      2, YAFREC_BI },
776     { (char*)"subTemplateMultiList",               0, YAFREC_STML },
777     FB_IESPEC_NULL
778 };
779 
780 typedef struct ski_yafrec_st {
781     uint16_t        sourceTransportPort;            /*   0-  1 */
782     uint16_t        destinationTransportPort;       /*   2-  3 */
783 
784     uint8_t         protocolIdentifier;             /*   4     */
785     uint8_t         tcpControlBits;                 /*   5     */
786     uint8_t         initialTCPFlags;                /*   6     */
787     uint8_t         unionTCPFlags;                  /*   7     */
788 
789     uint32_t        ingressInterface;               /*   8- 11 */
790     uint32_t        egressInterface;                /*  12- 15 */
791 
792     /* may also hold packetTotalCount and octetTotalCount values */
793     uint64_t        packetDeltaCount;               /*  16- 23 */
794     uint64_t        octetDeltaCount;                /*  24- 31 */
795 
796     /* Flow attribute flags (reverse value below) */
797     uint16_t        flowAttributes;                 /*  32- 33 */
798 
799     /* SiLK valuee */
800     uint16_t        silkAppLabel;                   /*  34- 35 */
801     uint16_t        silkFlowSensor;                 /*  36- 37 */
802     uint8_t         silkFlowType;                   /*  38     */
803     uint8_t         silkTCPState;                   /*  39     */
804 
805     /* Time can be represented in many different formats: */
806     uint64_t        flowStartMilliseconds;          /*  40- 55 */
807     uint64_t        flowEndMilliseconds;            /*  48- 63 */
808 
809     /* vlan IDs (reverse values below) */
810     uint16_t        vlanId;                         /*  56- 57 */
811     uint16_t        postVlanId;                     /*  58- 59 */
812 
813     /* ICMP, may be icmpTypeCodeIPv4 or icmpTypeCodeIPv6 */
814     uint16_t        icmpTypeCode;                   /*  60- 61 */
815     uint8_t         flowEndReason;                  /*  62     */
816     uint8_t         ipClassOfService;               /*  63     */
817 
818     uint32_t        sourceIPv4Address;              /*  64- 67 */
819     uint32_t        destinationIPv4Address;         /*  68- 71 */
820     uint32_t        ipNextHopIPv4Address;           /*  72- 75 */
821     uint32_t        paddingOctets_1;                /*  76- 79 */
822 
823     uint8_t         sourceIPv6Address[16];          /*  80- 95 */
824     uint8_t         destinationIPv6Address[16];     /*  96-111 */
825     uint8_t         ipNextHopIPv6Address[16];       /* 112-127 */
826 
827     /* may also hold reversePacketTotalCount and
828      * reverseOctetTotalCount values */
829     uint64_t        reversePacketDeltaCount;        /* 128-135 */
830     uint64_t        reverseOctetDeltaCount;         /* 136-143 */
831 
832     /* start time of reverse flow, as millisec offset from start time
833      * of forward flow */
834     uint32_t        reverseFlowDeltaMilliseconds;   /* 144-147 */
835 
836     /* vlan IDs (reverse) */
837     uint16_t        reverseVlanId;                  /* 148-149 */
838     uint16_t        reversePostVlanId;              /* 150-151 */
839 
840     /* Flow attribute flags (reverse) */
841     uint16_t        reverseFlowAttributes;          /* 152-153 */
842 
843    /* Flags for the reverse flow: */
844     uint8_t         reverseTcpControlBits;          /* 154     */
845     uint8_t         reverseInitialTCPFlags;         /* 155     */
846     uint8_t         reverseUnionTCPFlags;           /* 156     */
847     uint8_t         reverseIpClassOfService;        /* 157     */
848     uint16_t        paddingOctets_2;                /* 158-159 */
849 
850     /* TCP flags from yaf (when it is run without --silk) */
851     fbSubTemplateMultiList_t stml;                  /* 160-... */
852 } ski_yafrec_t;
853 
854 
855 /*
856  *  **********  YAF SubTemplateMultiList TCP Info Template  **********
857  *
858  *    Use the following to read TCP flags that YAF has exported in an
859  *    IPFIX subTemplateMultiList.
860  */
861 
862 /* Incoming Template ID used by YAF for a subTemplateMultiList item
863  * containing only forward TCP flags information. */
864 #define SKI_YAF_TCP_FLOW_TID    0xC003
865 
866 /* Bit in the incoming Template ID that yaf sets for templates
867  * containing reverse elements */
868 #define SKI_YAF_REVERSE_BIT     0x0010
869 
870 /* Internal Template ID */
871 #define SKI_TCP_STML_TID        0xAFEC
872 
873 static fbInfoElementSpec_t ski_tcp_stml_spec[] = {
874     { (char*)"initialTCPFlags",                    1, 0 },
875     { (char*)"unionTCPFlags",                      1, 0 },
876     { (char*)"reverseInitialTCPFlags",             1, 0 },
877     { (char*)"reverseUnionTCPFlags",               1, 0 },
878     FB_IESPEC_NULL
879 };
880 
881 typedef struct ski_tcp_stml_st {
882     uint8_t         initialTCPFlags;
883     uint8_t         unionTCPFlags;
884     uint8_t         reverseInitialTCPFlags;
885     uint8_t         reverseUnionTCPFlags;
886 } ski_tcp_stml_t;
887 
888 
889 /*
890  *  **********  YAF Statistics Options Template  **********
891  *
892  *    Information for statistics information exported by YAF.  The
893  *    template and structure are based on the yaf 2.3.0 manual page.
894  *    The templates and struct are used by ski_yafstats_next() when
895  *    reading data.
896  *
897  *    The type for these records is SKI_RECTYPE_YAFSTATS
898  *
899  *    These types are defined in ipfixsource.h so they may be shared
900  *    with ipfixsource.c and that the source structure may contain
901  *    them.
902  *
903  *    #define SKI_YAFSTATS_TID        0xD000
904  *
905  *    static fbInfoElementSpec_t ski_yafstats_spec[] = {...};
906  *
907  *    typedef struct ski_yafstats_st { ... } ski_yafstats_t;
908  *
909  */
910 
911 
912 
913 /*
914  *  **********  NetFlowV9 Data Template  **********
915  *
916  *    Define the list of information elements and the corresponding
917  *    struct for reading common NetFlowV9 records.  The templates and
918  *    struct are used by ski_nf9rec_next() when reading data.
919  *
920  *    The type for these records is SKI_RECTYPE_NF9REC
921  */
922 
923 /*
924  *    There are several templates defined here.  The following macros
925  *    determine which elements in the ski_nf9rec_spec[] are used.  For
926  *    the template that uses the elements, the correspong bits are set
927  *    to high in the SKI_NF9REC_TID below.
928  *
929  *    On SKI_NF9REC_TID, high bits are 1, 13, 14.
930  */
931 #define NF9REC_DELTA        (1 <<  2)
932 #define NF9REC_TOTAL        (1 <<  3)
933 #define NF9REC_INITIATOR    (1 <<  4)
934 
935 #define NF9REC_IP4          (1 <<  5)
936 #define NF9REC_IP6          (1 <<  6)
937 
938 #define NF9REC_SYSUP        (1 <<  7)
939 #define NF9REC_MILLI        (1 <<  8)
940 
941 #define SKI_NF9REC_TID      0x6002
942 #if (SKI_NF9REC_TID & BMAP_RECTYPE_MASK) != BMAP_RECTYPE_NF9REC
943 #error "bad SKI_NF9REC_TID value"
944 #endif
945 
946 static fbInfoElementSpec_t ski_nf9rec_spec[] = {
947     /* Ports, Protocol */
948     { (char*)"sourceTransportPort",                2, 0 },
949     { (char*)"destinationTransportPort",           2, 0 },
950     { (char*)"protocolIdentifier",                 1, 0 },
951     /* TCP Flags */
952     { (char*)"tcpControlBits",                     1, 0 },
953     /* End reason */
954     { (char*)"flowEndReason",                      1, 0 },
955     /* TOS */
956     { (char*)"ipClassOfService",                   1, 0 },
957     /* Router interfaces */
958     { (char*)"ingressInterface",                   4, 0 },
959     { (char*)"egressInterface",                    4, 0 },
960     /* Volume, in one of three different ways; initiatorOctets has
961      * matching responderOctets.  This does not handle OUT_BYTES(23)
962      * and OUT_PACKETS(24) IEs, that fixbuf translates to reverse
963      * Delta elements */
964     { (char*)"packetDeltaCount",                   8, NF9REC_DELTA },
965     { (char*)"octetDeltaCount",                    8, NF9REC_DELTA },
966     { (char*)"postPacketDeltaCount",               8, NF9REC_DELTA },
967     { (char*)"postOctetDeltaCount",                8, NF9REC_DELTA },
968     { (char*)"packetTotalCount",                   8, NF9REC_TOTAL },
969     { (char*)"octetTotalCount",                    8, NF9REC_TOTAL },
970     { (char*)"postPacketTotalCount",               8, NF9REC_TOTAL },
971     { (char*)"postOctetTotalCount",                8, NF9REC_TOTAL },
972     { (char*)"initiatorPackets",                   8, NF9REC_INITIATOR},
973     { (char*)"initiatorOctets",                    8, NF9REC_INITIATOR },
974     { (char*)"responderPackets",                   8, NF9REC_INITIATOR},
975     { (char*)"responderOctets",                    8, NF9REC_INITIATOR },
976     /* SysUpTime, used to handle Netflow v9 SysUpTime offset times */
977     { (char*)"systemInitTimeMilliseconds",         8, NF9REC_SYSUP },
978     { (char*)"flowStartSysUpTime",                 4, NF9REC_SYSUP },
979     { (char*)"flowEndSysUpTime",                   4, NF9REC_SYSUP },
980     /* Millisecond start and end. Note that end uses the
981      * observationTime value, though the structure calls it
982      * flowEndMilliseconds. */
983     { (char*)"flowStartMilliseconds",              8, NF9REC_MILLI },
984     { (char*)"observationTimeMilliseconds",        8, NF9REC_MILLI },
985     /* Vlan IDs */
986     { (char*)"vlanId",                             2, 0 },
987     { (char*)"postVlanId",                         2, 0 },
988     /* ICMP, either IPv4 or IPv6 */
989     { (char*)"icmpTypeCodeIPv4",                   2, NF9REC_IP4 },
990     { (char*)"icmpTypeIPv4",                       1, NF9REC_IP4 },
991     { (char*)"icmpCodeIPv4",                       1, NF9REC_IP4 },
992     { (char*)"icmpTypeCodeIPv6",                   2, NF9REC_IP6 },
993     { (char*)"icmpTypeIPv6",                       1, NF9REC_IP6 },
994     { (char*)"icmpCodeIPv6",                       1, NF9REC_IP6 },
995     /* IPv4 Addresses */
996     { (char*)"sourceIPv4Address",                  4, NF9REC_IP4 },
997     { (char*)"destinationIPv4Address",             4, NF9REC_IP4 },
998     { (char*)"ipNextHopIPv4Address",               4, NF9REC_IP4 },
999     /* Pad to the firewall event: 3*(16-4)+4 */
1000     { (char*)"paddingOctets",                     40, NF9REC_IP4 },
1001     /* IPv6 Addresses */
1002     { (char*)"sourceIPv6Address",                 16, NF9REC_IP6 },
1003     { (char*)"destinationIPv6Address",            16, NF9REC_IP6 },
1004     { (char*)"ipNextHopIPv6Address",              16, NF9REC_IP6 },
1005 
1006     /* Firewall events */
1007     { (char*)"paddingOctets",                      4, NF9REC_IP6 },
1008     { (char*)"firewallEvent",                      1, 0 },
1009     { (char*)"NF_F_FW_EVENT",                      1, 0 },
1010     { (char*)"NF_F_FW_EXT_EVENT",                  2, 0 },
1011     FB_IESPEC_NULL
1012 };
1013 
1014 typedef struct ski_nf9rec_st {
1015     uint16_t        sourceTransportPort;            /*   0-  1 */
1016     uint16_t        destinationTransportPort;       /*   2-  3 */
1017 
1018     uint8_t         protocolIdentifier;             /*   4     */
1019     uint8_t         tcpControlBits;                 /*   5     */
1020     uint8_t         flowEndReason;                  /*   6     */
1021     uint8_t         ipClassOfService;               /*   7     */
1022 
1023     uint32_t        ingressInterface;               /*   8- 11 */
1024     uint32_t        egressInterface;                /*  12- 15 */
1025 
1026     /* may also hold packetTotalCount and octetTotalCount,
1027      * initiatorPackets and initiatorOctets */
1028     uint64_t        packetDeltaCount;               /*  16- 23 */
1029     uint64_t        octetDeltaCount;                /*  24- 31 */
1030 
1031     /* postPacketDeltaCount and postOctetDeltaCount; or
1032      * postPacketTotalCount and postPacketTotalCount; or
1033      * responderPackets and responderOctets when the NF9REC_INITIATOR
1034      * bit is set */
1035     uint64_t        postPacketDeltaCount;           /*  32- 39 */
1036     uint64_t        postOctetDeltaCount;            /*  40- 47 */
1037 
1038     union nf9rec_time_un {
1039         /* Traditional NetFlow time uses SysUptime */
1040         struct nf9rec_time_sysup_st {
1041             uint64_t        systemInitTimeMilliseconds;     /*  48- 55 */
1042             uint32_t        flowStartSysUpTime;             /*  56- 59 */
1043             uint32_t        flowEndSysUpTime;               /*  60- 63 */
1044         }           sysup;
1045         struct nf9rec_time_milli_st {
1046             uint64_t        flowStartMilliseconds;          /*  48- 55 */
1047             uint64_t        flowEndMilliseconds;            /*  56- 63 */
1048         }           milli;
1049     }               t;
1050 
1051     /* vlan IDs (reverse values below) */
1052     uint16_t        vlanId;                         /*  64- 65 */
1053     uint16_t        postVlanId;                     /*  66- 67 */
1054 
1055     /* ICMP, may be icmpTypeCodeIPv4 or icmpTypeCodeIPv6 */
1056     uint16_t        icmpTypeCode;                   /*  68- 69 */
1057     /* ICMP, may be icmpTypeIPv4 or icmpTypeIPv6 */
1058     uint8_t         icmpType;                       /*  70     */
1059     /* ICMP, may be icmpCodeIPv4 or icmpCodeIPv6 */
1060     uint8_t         icmpCode;                       /*  71     */
1061 
1062     union nf9rec_addr_un {
1063         struct nf9rec_ip4_st {
1064             uint32_t        sourceIPv4Address;              /*  72- 75 */
1065             uint32_t        destinationIPv4Address;         /*  76- 79 */
1066             uint32_t        ipNextHopIPv4Address;           /*  80- 83 */
1067         }           ip4;
1068         struct nf9rec_ip6_st {
1069             uint8_t         sourceIPv6Address[16];          /*  72- 87 */
1070             uint8_t         destinationIPv6Address[16];     /*  88-103 */
1071             uint8_t         ipNextHopIPv6Address[16];       /* 104-119 */
1072         }           ip6;
1073     }               addr;
1074 
1075     uint32_t        paddingOctets;                  /* 120-123 */
1076     /* Firewall events */
1077     uint8_t         firewallEvent;                  /* 124     */
1078     uint8_t         NF_F_FW_EVENT;                  /* 125     */
1079     uint16_t        NF_F_FW_EXT_EVENT;              /* 126-127 */
1080 } ski_nf9rec_t;
1081 
1082 
1083 /*
1084  *  **********  Tombstone Record Options Template  **********
1085  *
1086  *    Define the list of information elements and the corresponding
1087  *    structs for reading YAF Options Template records that contain a
1088  *    tombstone counter.  The templates and structs are used by
1089  *    ski_tombstone_next() when reading data.
1090  *
1091  *    The type for these records is SKI_RECTYPE_TOMBSTONE
1092  *
1093  *    The records include a subTemplateList represented by
1094  *    ski_tombstone_access_spec[], ski_tombstone_access_t, and
1095  *    SKI_TOMBSTONE_ACCESS_TID.
1096  */
1097 
1098 #define SKI_TOMBSTONE_TID           0xAFEE
1099 
1100 /* the internal template id */
1101 #define SKI_TOMBSTONE_ACCESS_TID    0xAFE9
1102 
1103 /* the external template id for the timestamp list */
1104 #define SKI_YAF_TOMBSTONE_ACCESS    0xD002
1105 
1106 /* tombstoneId, exporterConfiguredId, exporterUniqueId, certToolId,
1107  * and tombstoneAccessList are CERT_PEN elements, IDs 550-554 */
1108 static fbInfoElementSpec_t ski_tombstone_spec[] = {
1109     { (char*)"observationDomainId",       4, 0 },    /* 149 */
1110     { (char*)"exportingProcessId",        4, 0 },    /* 144 */
1111     { (char*)"exporterConfiguredId",      2, 0 },    /* CERT_PEN, 551 */
1112     { (char*)"exporterUniqueId",          2, 0 },    /* CERT_PEN, 552 */
1113     { (char*)"paddingOctets",             4, 0 },    /* 210 */
1114     { (char*)"tombstoneId",               4, 0 },    /* CERT_PEN, 550 */
1115     { (char*)"observationTimeSeconds",    4, 0 },    /* 322 */
1116 #if SKIPFIX_ENABLE_TOMBSTONE_TIMES
1117     { (char*)"subTemplateList",           0, 0 },    /* 292 */
1118 #if FIXBUF_CHECK_VERSION(2,3,0)
1119     /* because fixbuf < 2.3.0 does not decode list-type elements
1120      * correctly, only use the element with fixbuf >= 2.3.0. */
1121     { (char*)"tombstoneAccessList",       0, 0 },    /* CERT_PEN, 554 */
1122 #endif
1123 #endif  /* SKIPFIX_ENABLE_TOMBSTONE_TIMES */
1124     FB_IESPEC_NULL
1125 };
1126 
1127 typedef struct ski_tombstone_st {
1128     uint32_t    observationDomainId;            /*  0 -  3 */
1129     uint32_t    exportingProcessId;             /*  4 -  7 */
1130     uint16_t    exporterConfiguredId;           /*  8 -  9 */
1131     uint16_t    exporterUniqueId;               /* 10 - 11 */
1132     uint32_t    paddingOctets;                  /* 12 - 15 */
1133     uint32_t    tombstoneId;                    /* 16 - 19 */
1134     uint32_t    observationTimeSeconds;         /* 20 - 23 */
1135 #if SKIPFIX_ENABLE_TOMBSTONE_TIMES
1136     fbSubTemplateList_t stl;                    /* 24...   */
1137 #if FIXBUF_CHECK_VERSION(2,3,0)
1138     fbSubTemplateList_t tombstoneAccessList;    /* ...     */
1139 #endif
1140 #endif  /* SKIPFIX_ENABLE_TOMBSTONE_TIMES */
1141 } ski_tombstone_t;
1142 
1143 
1144 /* The template used by the subTemplateList */
1145 static fbInfoElementSpec_t ski_tombstone_access_spec[] = {
1146     { (char*)"certToolId",                4, 0 },    /* CERT_PEN, 553 */
1147     { (char*)"exportingProcessId",        4, 0 },    /* 144 */
1148     { (char*)"observationTimeSeconds",    4, 0 },    /* 322 */
1149     FB_IESPEC_NULL
1150 };
1151 
1152 typedef struct ski_tombstone_access_st {
1153     uint32_t certToolId;
1154     uint32_t exportingProcessId;
1155     uint32_t observationTimeSeconds;
1156 } ski_tombstone_access_t;
1157 
1158 
1159 
1160 /*
1161  *  **********  NetFlowV9 Sampling Options Template  **********
1162  *
1163  *    Define the list of information elements and the corresponding
1164  *    struct for reading NetFlowV9 Options Template records that
1165  *    contain sampling information.  The template and struct are used
1166  *    by ski_nf9sampling_next() when reading data.
1167  *
1168  *    The type for these records is SKI_RECTYPE_NF9SAMPLING
1169  */
1170 
1171 #define SKI_NF9SAMPLING_TID     0xAFEF
1172 
1173 #define SKI_NF9SAMPLING_PADDING 5
1174 
1175 static fbInfoElementSpec_t ski_nf9sampling_spec[] = {
1176     { (char*)"samplingInterval",          4, 0 },    /* 34 */
1177 
1178     { (char*)"flowSamplerRandomInterval", 4, 1 },    /* 50, current fixbuf */
1179     { (char*)"samplerRandomInterval",     4, 2 },    /* 50, future fixbuf */
1180 
1181     { (char*)"samplingAlgorithm",         1, 0 },    /* 35 */
1182 
1183     { (char*)"flowSamplerMode",           1, 1 },    /* 49, current fixbuf */
1184     { (char*)"samplerMode",               1, 2 },    /* 49, future fixbuf */
1185 
1186     { (char*)"flowSamplerID",             1, 1 },    /* 48, current fixbuf */
1187     { (char*)"samplerId",                 1, 2 },    /* 48, future fixbuf */
1188 
1189 #if SKI_NF9SAMPLING_PADDING != 0
1190     { (char*)"paddingOctets",             SKI_NF9SAMPLING_PADDING, 0 },
1191 #endif
1192     FB_IESPEC_NULL
1193 };
1194 
1195 typedef struct ski_nf9sampling_st {
1196     uint32_t    samplingInterval;
1197     uint32_t    samplerRandomInterval;
1198     uint8_t     samplingAlgorithm;
1199     uint8_t     samplerMode;
1200     uint8_t     samplerId;
1201 #if SKI_NF9SAMPLING_PADDING != 0
1202     uint8_t     paddingOctets[SKI_NF9SAMPLING_PADDING];
1203 #endif
1204 } ski_nf9sampling_t;
1205 
1206 
1207 /*
1208  *  **********  Simple Template for Ignoring Data  **********
1209  *
1210  *    Simple template for reading data that is thrown away.  The
1211  *    template and struct are used by ski_ignore_next() when reading
1212  *    data.
1213  *
1214  *    The type for these records is SKI_RECTYPE_IGNORE
1215  */
1216 
1217 #define SKI_IGNORE_TID          0x4444
1218 
1219 static fbInfoElementSpec_t ski_ignore_spec[] = {
1220     { (char*)"systemInitTimeMilliseconds",         8, 0 },
1221     FB_IESPEC_NULL
1222 };
1223 
1224 typedef struct ski_ignore_st {
1225     uint64_t    systemInitTimeMilliseconds;
1226 } ski_ignore_t;
1227 
1228 
1229 /*
1230  *  **********  Union Across All Record Types  **********
1231  */
1232 
1233 /* Types of IPFIX records.  Returned by ski_rectype_next(). */
1234 typedef enum ski_rectype_en {
1235     SKI_RECTYPE_ERROR,
1236     SKI_RECTYPE_FIXREC,
1237     SKI_RECTYPE_YAFREC,
1238     SKI_RECTYPE_NF9REC,
1239     SKI_RECTYPE_YAFSTATS,
1240     SKI_RECTYPE_TOMBSTONE,
1241     SKI_RECTYPE_NF9SAMPLING,
1242     SKI_RECTYPE_IGNORE
1243 } ski_rectype_t;
1244 
1245 #if TRACEMSG_LEVEL >= 2
1246 static const char *ski_rectype_name[] = {
1247     "SKI_RECTYPE_ERROR",
1248     "SKI_RECTYPE_FIXREC",
1249     "SKI_RECTYPE_YAFREC",
1250     "SKI_RECTYPE_NF9REC",
1251     "SKI_RECTYPE_YAFSTATS",
1252     "SKI_RECTYPE_TOMBSTONE",
1253     "SKI_RECTYPE_NF9SAMPLING",
1254     "SKI_RECTYPE_IGNORE"
1255 };
1256 #endif
1257 
1258 struct ski_record_st {
1259     /* Template used to read this record */
1260     fbTemplate_t       *tmpl;
1261     /* The bitmap value that is tmpl's context */
1262     BMAP_TYPE           bmap;
1263     /* The ID of tmpl */
1264     uint16_t            tid;
1265     /* The record type that is decided from the bitmap */
1266     ski_rectype_t       rectype;
1267     /* A pointer to the forward rwRec to be filled */
1268     rwRec              *fwd_rec;
1269     /* A pointer to the reverse rwRec to be filled */
1270     rwRec              *rev_rec;
1271     /* The IPFIX record */
1272     union data_un {
1273         ski_fixrec_t        fixrec;
1274         ski_yafrec_t        yafrec;
1275         ski_nf9rec_t        nf9rec;
1276         ski_yafstats_t      yafstats;
1277         ski_tombstone_t     tombstone;
1278         ski_nf9sampling_t   nf9sampling;
1279         ski_ignore_t        ignore;
1280     }                   data;
1281 };
1282 typedef struct ski_record_st ski_record_t;
1283 
1284 
1285 
1286 /*
1287  *  **********  FUNCTION DEFINITIONS  **********
1288  */
1289 
1290 /*  Create macros to assist in creating TRACEMSG()s.  To avoid a C89
1291  *  compiler warning, put DEFINE_PREFIX_BUF() after other variable
1292  *  definitions but before any other code.  */
1293 #if TRACEMSG_LEVEL < 2
1294 #define DEFINE_PREFIX_BUF(varname)
1295 #define makeTracemsgPrefix(_a, _b, _c, _d, _e, _f)
1296 #else
1297 #define DEFINE_PREFIX_BUF(varname)    char varname [512]
1298 /*
1299  *    Format the probe name, template id, domain, and template pointer
1300  *    in a string buffer.
1301  */
1302 static char *
makeTracemsgPrefix(char * buf,size_t buflen,const char * probe_name,uint32_t domain,uint16_t tid,const fbTemplate_t * tmpl)1303 makeTracemsgPrefix(
1304     char               *buf,
1305     size_t              buflen,
1306     const char         *probe_name,
1307     uint32_t            domain,
1308     uint16_t            tid,
1309     const fbTemplate_t *tmpl)
1310 {
1311     snprintf(buf, buflen, "'%s': Template %#06x, domain %#x, [%p]",
1312              probe_name, tid, domain, (void *)tmpl);
1313     return buf;
1314 }
1315 #endif  /* TRACEMSG_LEVEL */
1316 
1317 /*
1318  *    The skiTemplateCallbackCtx() callback is invoked whenever the
1319  *    session receives a new template.  This function must have the
1320  *    signature defined by a typedef defined by libfixbuf.
1321  *
1322  *    In fixbuf 2.x, the callback is set by calling
1323  *    fbSessionAddNewTemplateCallback() and its signature is given by
1324  *    'fbNewTemplateCallback_fn'.  In fixbuf 1.x, it is set by calling
1325  *    fbSessionAddTemplateCtxCallback2() and its signature is
1326  *    'fbTemplateCtxCallback2_fn'.
1327  *
1328  *    One purpose of the callback is the tell fixbuf how to process
1329  *    items in a subTemplateMultiList.  We tell fixbuf to map from
1330  *    the two templates that yaf uses for TCP flags (one of which has
1331  *    reverse elements and one of which does not) to the struct used
1332  *    in this file.
1333  *
1334  *    The callback also examines the template and sets a context
1335  *    pointer that contains high bits for certain information
1336  *    elements.  See the detailed comment above the "struct elem_st"
1337  *    definition.
1338  *
1339  *    Finally, if the probe has the SOURCE_LOG_TEMPLATES flag set or
1340  *    the global `show_templates` is true (the environment variable
1341  *    named by SKI_ENV_PRINT_TEMPLATES controls this), the templates
1342  *    are printed to the log file.  (`show_templates` is defined in
1343  *    probeconf.c.)
1344  */
1345 static void
skiTemplateCallbackCtx(fbSession_t * session,uint16_t tid,fbTemplate_t * tmpl,void * app_ctx,void ** ctx,fbTemplateCtxFree_fn * ctx_free_fn)1346 skiTemplateCallbackCtx(
1347     fbSession_t            *session,
1348     uint16_t                tid,
1349     fbTemplate_t           *tmpl,
1350     void                   *app_ctx,
1351     void                  **ctx,
1352 #if FIXBUF_CHECK_VERSION(2,0,0)
1353     fbTemplateCtxFree_fn
1354 #else
1355     fbTemplateCtxFree2_fn
1356 #endif
1357     *ctx_free_fn)
1358 {
1359 #define TMPL_PROC_MSG2(is_option, wp2_tmpl_name)                        \
1360     DEBUGMSG(("'%s': Processing " is_option "template %#06x (%u),"      \
1361               " domain %#x, with the %s template"),                     \
1362              name, tid, tid, domain, (wp2_tmpl_name))
1363 
1364 #define TMPL_PROC_MSG(wp_name)     TMPL_PROC_MSG2("", wp_name)
1365 #define TMPL_PROC_MSG_OPT(wp_name) TMPL_PROC_MSG2("options ", wp_name)
1366 
1367     fbCollector_t *coll;
1368     const fbInfoElement_t *ie;
1369     const char *name;
1370     int show_tmpl;
1371     BMAP_TYPE out;
1372     uint64_t bmap;
1373     uint32_t domain;
1374     uint32_t count;
1375     uint32_t scope;
1376     uint32_t i;
1377     int known_id;
1378     DEFINE_PREFIX_BUF(prefix);
1379 
1380     TRACE_ENTRY;
1381     SK_UNUSED_PARAM(app_ctx);
1382 
1383     *ctx = NULL;
1384     *ctx_free_fn = NULL;
1385 
1386     domain = fbSessionGetDomain(session);
1387     count = fbTemplateCountElements(tmpl);
1388     scope = fbTemplateGetOptionsScope(tmpl);
1389     bmap = 0;
1390     out = 0;
1391 
1392     coll = fbSessionGetCollector(session);
1393     if (NULL == coll) {
1394         name = "<udp>";
1395         show_tmpl = show_templates;
1396     } else {
1397         const skIPFIXConnection_t *conn;
1398         const skIPFIXSource_t *source;
1399         conn = (skIPFIXConnection_t *)fbCollectorGetContext(coll);
1400         source = conn->source;
1401         name = source->name;
1402         show_tmpl = skpcProbeGetLogFlags(source->probe) & SOURCE_LOG_TEMPLATES;
1403     }
1404 
1405     makeTracemsgPrefix(prefix, sizeof(prefix), name, domain, tid,tmpl);
1406     TRACEMSG(2, ("%s skiTemplateCallbackCtx()", prefix));
1407 
1408     if (scope) {
1409         unsigned int samplingAlgorithm;
1410         unsigned int samplerMode;
1411 
1412         /* do not define any template pairs for this template */
1413         fbSessionAddTemplatePair(session, tid, 0);
1414 
1415         /* the sampling check requires multiple elements */
1416         samplingAlgorithm = samplerMode = 0;
1417 
1418         for (i = 0; i < count && (ie = fbTemplateGetIndexedIE(tmpl, i)); ++i) {
1419             if (ie->ent == 0) {
1420                 switch (ie->num) {
1421                   case  34:
1422                   case  35:
1423                     /* verify that both samplingInterval and
1424                      * samplingAlgorithm are present */
1425                     ++samplingAlgorithm;
1426                     if (2 == samplingAlgorithm) {
1427                         bmap |= (1 | TMPL_BIT_samplingAlgorithm);
1428                     }
1429                     break;
1430                   case  49:
1431                   case  50:
1432                     /* verify that both samplerMode and
1433                      * samplerRandomInterval are present */
1434                     ++samplerMode;
1435                     if (2 == samplerMode) {
1436                         bmap |= (1 | TMPL_BIT_samplerMode);
1437                     }
1438                     break;
1439                 }
1440             } else if (ie->ent == IPFIX_CERT_PEN) {
1441                 /* CERT PRIVATE ENTERPRISE ELEMENTS */
1442                 switch (ie->num) {
1443                   case 104:
1444                     ASSERT_IE_NAME_IS(ie, flowTableFlushEventCount);
1445                     bmap |= (1 | TMPL_BIT_flowTableFlushEventCount);
1446                     break;
1447                   case 105:
1448                     ASSERT_IE_NAME_IS(ie, flowTablePeakCount);
1449                     bmap |= (1 | TMPL_BIT_flowTablePeakCount);
1450                     break;
1451                   case 550:
1452                     ASSERT_IE_NAME_IS(ie, tombstoneId);
1453                     bmap |= (1 | TMPL_BIT_tombstoneId);
1454                     break;
1455                 }
1456             }
1457             TRACEMSG(
1458                 3, ("%s bmap %#012" PRIx64 ", IE %s (%u/%u)",
1459                     prefix, bmap, ie->ref.canon->ref.name, ie->ent, ie->num));
1460         }
1461         if (bmap) {
1462             out = (BMAP_TYPE)bmap;
1463             BMAP_TMPL_CTX_SET(ctx, ctx_free_fn, out);
1464         }
1465         TMPL_PROC_MSG_OPT(((bmap & (TMPL_BIT_flowTableFlushEventCount
1466                                     | TMPL_BIT_flowTablePeakCount))
1467                            ? "YAFstats"
1468                            : ((bmap & (TMPL_BIT_tombstoneId))
1469                               ? "tombstone"
1470                               : ((bmap & (TMPL_BIT_samplingAlgorithm
1471                                           | TMPL_BIT_samplerMode))
1472                                  ? "sampling"
1473                                  : "ignore"))));
1474 
1475     } else {
1476         /* populate the bitmap */
1477         for (i = 0; i < count && (ie = fbTemplateGetIndexedIE(tmpl, i)); ++i) {
1478             if (ie->ent == 0) {
1479                 /* STANDARD ELEMENT */
1480                 switch (ie->num) {
1481                   case   8:
1482                   case  12:
1483                     /* sourceIPv4Address and/or destinationIPv4Address */
1484                     bmap |= TMPL_BIT_sourceIPv4Address;
1485                     break;
1486                   case  27:
1487                   case  28:
1488                     /* sourceIPv6Address and/or destinationIPv6Address */
1489                     bmap |= TMPL_BIT_sourceIPv6Address;
1490                     break;
1491                   case 1:
1492                     /* octetDeltaCount */
1493                     bmap |= TMPL_BIT_octetDeltaCount;
1494                     break;
1495                   case 2:
1496                     /* packetDeltaCount */
1497                     bmap |= TMPL_BIT_packetDeltaCount;
1498                     break;
1499                   case 85:
1500                     /* octetTotalCount */
1501                     bmap |= TMPL_BIT_octetTotalCount;
1502                     break;
1503                   case 86:
1504                     /* packetTotalCount */
1505                     bmap |= TMPL_BIT_packetTotalCount;
1506                     break;
1507                   case 23:
1508                   case 24:
1509                     /* postOctetDeltaCount and/or postPacketDeltaCount */
1510                     bmap |= TMPL_BIT_postOctetDeltaCount;
1511                     break;
1512                   case 171:
1513                   case 172:
1514                     /* postOctetTotalCount and/or postPacketTotalCount */
1515                     bmap |= TMPL_BIT_postOctetTotalCount;
1516                     break;
1517                   case 32:
1518                   case 139:
1519                     /* icmpTypeCodeIPv4 and/or icmpTypeCodeIPv6 */
1520                     bmap |= TMPL_BIT_icmpTypeCodeIPv4;
1521                     break;
1522                   case 176:
1523                   case 177:
1524                   case 178:
1525                   case 179:
1526                     /* icmpTypeIPv4, icmpCodeIPv4, icmpTypeIPv6, and
1527                      * icmpCodeIPv6 all map to same position */
1528                     bmap |= TMPL_BIT_icmpTypeIPv4;
1529                     break;
1530                   case 231:
1531                   case 298:
1532                     /* initiatorOctets and/or initiatorPackets */
1533                     bmap |= TMPL_BIT_initiatorOctets;
1534                     break;
1535                   case 232:
1536                   case 299:
1537                     /* responderOctets and/or responderPackets */
1538                     bmap |= TMPL_BIT_responderOctets;
1539                     break;
1540 
1541                   case  22:
1542                     ASSERT_IE_NAME_IS(ie, flowStartSysUpTime);
1543                     bmap |= TMPL_BIT_flowStartSysUpTime;
1544                     break;
1545                   case  59:
1546                     ASSERT_IE_NAME_IS(ie, postVlanId);
1547                     bmap |= TMPL_BIT_postVlanId;
1548                     break;
1549                   case 144:
1550                     ASSERT_IE_NAME_IS(ie, exportingProcessId);
1551                     bmap |= TMPL_BIT_exportingProcessId;
1552                     break;
1553                   case 150:
1554                     ASSERT_IE_NAME_IS(ie, flowStartSeconds);
1555                     bmap |= TMPL_BIT_flowStartSeconds;
1556                     break;
1557                   case 151:
1558                     ASSERT_IE_NAME_IS(ie, flowEndSeconds);
1559                     bmap |= TMPL_BIT_flowEndSeconds;
1560                     break;
1561                   case 152:
1562                     ASSERT_IE_NAME_IS(ie, flowStartMilliseconds);
1563                     bmap |= TMPL_BIT_flowStartMilliseconds;
1564                     break;
1565                   case 153:
1566                     ASSERT_IE_NAME_IS(ie, flowEndMilliseconds);
1567                     bmap |= TMPL_BIT_flowEndMilliseconds;
1568                     break;
1569                   case 154:
1570                     ASSERT_IE_NAME_IS(ie, flowStartMicroseconds);
1571                     bmap |= TMPL_BIT_flowStartMicroseconds;
1572                     break;
1573                   case 155:
1574                     ASSERT_IE_NAME_IS(ie, flowEndMicroseconds);
1575                     bmap |= TMPL_BIT_flowEndMicroseconds;
1576                     break;
1577                   case 156:
1578                     ASSERT_IE_NAME_IS(ie, flowStartNanoseconds);
1579                     bmap |= TMPL_BIT_flowStartNanoseconds;
1580                     break;
1581                   case 157:
1582                     ASSERT_IE_NAME_IS(ie, flowEndNanoseconds);
1583                     bmap |= TMPL_BIT_flowEndNanoseconds;
1584                     break;
1585                   case 158:
1586                     ASSERT_IE_NAME_IS(ie, flowStartDeltaMicroseconds);
1587                     bmap |= TMPL_BIT_flowStartDeltaMicroseconds;
1588                     break;
1589                   case 159:
1590                     ASSERT_IE_NAME_IS(ie, flowEndDeltaMicroseconds);
1591                     bmap |= TMPL_BIT_flowEndDeltaMicroseconds;
1592                     break;
1593                   case 160:
1594                     ASSERT_IE_NAME_IS(ie, systemInitTimeMilliseconds);
1595                     bmap |= TMPL_BIT_systemInitTimeMilliseconds;
1596                     break;
1597                   case 161:
1598                     ASSERT_IE_NAME_IS(ie, flowDurationMilliseconds);
1599                     bmap |= TMPL_BIT_flowDurationMilliseconds;
1600                     break;
1601                   case 162:
1602                     ASSERT_IE_NAME_IS(ie, flowDurationMicroseconds);
1603                     bmap |= TMPL_BIT_flowDurationMicroseconds;
1604                     break;
1605                   case 233:
1606                     ASSERT_IE_NAME_IS(ie, firewallEvent);
1607                     bmap |= TMPL_BIT_firewallEvent;
1608                     break;
1609                   case 258:
1610                     ASSERT_IE_NAME_IS(ie, collectionTimeMilliseconds);
1611                     bmap |= TMPL_BIT_collectionTimeMilliseconds;
1612                     break;
1613                   case 293:
1614                     ASSERT_IE_NAME_IS(ie, subTemplateMultiList);
1615                     bmap |= TMPL_BIT_subTemplateMultiList;
1616                     break;
1617                   case 322:
1618                     ASSERT_IE_NAME_IS(ie, observationTimeSeconds);
1619                     bmap |= TMPL_BIT_observationTimeSeconds;
1620                     break;
1621                   case 323:
1622                     ASSERT_IE_NAME_IS(ie, observationTimeMilliseconds);
1623                     bmap |= TMPL_BIT_observationTimeMilliseconds;
1624                     break;
1625                   case 324:
1626                     ASSERT_IE_NAME_IS(ie, observationTimeMicroseconds);
1627                     bmap |= TMPL_BIT_observationTimeMicroseconds;
1628                     break;
1629                   case 325:
1630                     ASSERT_IE_NAME_IS(ie, observationTimeNanoseconds);
1631                     bmap |= TMPL_BIT_observationTimeNanoseconds;
1632                     break;
1633 
1634                   case FB_CISCO_ASA_EVENT_XTRA:
1635                     ASSERT_IE_NAME_IS(ie, NF_F_FW_EXT_EVENT);
1636                     bmap |= TMPL_BIT_NF_F_FW_EXT_EVENT;
1637                     break;
1638                   case FB_CISCO_ASA_EVENT_ID:
1639                     ASSERT_IE_NAME_IS(ie, NF_F_FW_EVENT);
1640                     bmap |= TMPL_BIT_NF_F_FW_EVENT;
1641                     break;
1642                 }
1643             } else if (ie->ent == FB_IE_PEN_REVERSE) {
1644                 /* REVERSE VALUE OF STANDARD ELEMENTS */
1645                 switch (ie->num) {
1646                   case 1:
1647                   case 2:
1648                     /* reverseOctetDeltaCount and/or
1649                      * reversePacketDeltaCount; for NetFlow v9 records
1650                      * they may hold post{Octet,Packet}DeltaCount when
1651                      * using libfixbuf prior to 1.8.0 */
1652                     bmap |= TMPL_BIT_reverseOctetDeltaCount;
1653                     break;
1654                   case 6:
1655                     ASSERT_IE_NAME_IS(ie, reverseTcpControlBits);
1656                     bmap |= TMPL_BIT_reverseTcpControlBits;
1657                     break;
1658                   case 58:
1659                     ASSERT_IE_NAME_IS(ie, reverseVlanId);
1660                     bmap |= TMPL_BIT_reverseVlanId;
1661                     break;
1662                 }
1663             } else if (ie->ent == IPFIX_CERT_PEN) {
1664                 /* CERT PRIVATE ENTERPRISE ELEMENTS */
1665                 switch (ie->num) {
1666                   case 14:
1667                     ASSERT_IE_NAME_IS(ie, initialTCPFlags);
1668                     bmap |= TMPL_BIT_initialTCPFlags;
1669                     break;
1670                   case 14 | FB_IE_VENDOR_BIT_REVERSE:
1671                     ASSERT_IE_NAME_IS(ie, reverseInitialTCPFlags);
1672                     bmap |= TMPL_BIT_reverseInitialTCPFlags;
1673                     break;
1674                   case 21:
1675                     ASSERT_IE_NAME_IS(ie, reverseFlowDeltaMilliseconds);
1676                     bmap |= TMPL_BIT_reverseFlowDeltaMilliseconds;
1677                     break;
1678                   case 553:
1679                     ASSERT_IE_NAME_IS(ie, certToolId);
1680                     bmap |= TMPL_BIT_certToolId;
1681                     break;
1682                 }
1683             }
1684             TRACEMSG(
1685                 3, ("%s bmap %#012" PRIx64 ", IE %s (%u/%u)",
1686                     prefix, bmap, ie->ref.canon->ref.name, ie->ent, ie->num));
1687         }
1688 
1689         /* now that the bitmap is populated, see if it matches some
1690          * expected patterns */
1691 
1692         /* tell fixbuf how to transcode templates that appear in lists */
1693         if (bmap == TMPL_BIT_initialTCPFlags
1694             || bmap == (TMPL_BIT_initialTCPFlags
1695                         | TMPL_BIT_reverseInitialTCPFlags))
1696         {
1697             /* the template ID matches the ID for the YAF template
1698              * that contains TCP flags */
1699             fbSessionAddTemplatePair(session, tid, SKI_TCP_STML_TID);
1700             TMPL_PROC_MSG("YAF TCP flags list");
1701             known_id = 1;
1702 #if SKIPFIX_ENABLE_TOMBSTONE_TIMES
1703         } else if ((bmap & TMPL_BIT_certToolId)
1704                    || ((bmap == (TMPL_BIT_exportingProcessId
1705                                  | TMPL_BIT_observationTimeSeconds)
1706                         && count == 2)))
1707         {
1708             /* the template ID matches the ID for the template that
1709              * contains tombstone timestamps */
1710             fbSessionAddTemplatePair(session, tid, SKI_TOMBSTONE_ACCESS_TID);
1711             TMPL_PROC_MSG("tombstone access");
1712             known_id = 1;
1713 #endif  /* SKIPFIX_ENABLE_TOMBSTONE_TIMES */
1714         } else {
1715             /* do not define any template pairs for this template */
1716             fbSessionAddTemplatePair(session, tid, 0);
1717             known_id = 0;
1718             /* clear the exportingProcessId bit */
1719             bmap &= ~TMPL_BIT_exportingProcessId;
1720         }
1721 
1722         /* check whether the template may be processed by the YAF
1723          * template by: not using any IEs outside of those defined the
1724          * YAF template, by having IP addresses, by using millisecond
1725          * times, and by having consistent IEs for volume */
1726         if (known_id) {
1727             /* no more tests are needed */
1728         } else if (0 == (bmap & ~TMPL_MASK_YAFREC)
1729             && (bmap & TMPL_MASK_IPADDRESS)
1730             && (bmap & TMPL_MASK_TIME_MILLI_YAF)
1731             && (((bmap & TMPL_MASK_VOLUME_YAF)
1732                  == (TMPL_BIT_octetDeltaCount | TMPL_BIT_packetDeltaCount))
1733                 || ((bmap & TMPL_MASK_VOLUME_YAF)
1734                     ==(TMPL_BIT_octetTotalCount | TMPL_BIT_packetTotalCount))))
1735         {
1736             /* Which volume element is present? */
1737             if ((bmap & TMPL_MASK_VOLUME_YAF)
1738                 == (TMPL_BIT_octetDeltaCount | TMPL_BIT_packetDeltaCount))
1739             {
1740                 out |= YAFREC_DELTA;
1741             } else {
1742                 assert((TMPL_BIT_octetTotalCount | TMPL_BIT_packetTotalCount)
1743                        == (bmap & TMPL_MASK_VOLUME_YAF));
1744                 out |= YAFREC_TOTAL;
1745             }
1746             /* Which IP addresses are present? */
1747             if ((bmap & TMPL_MASK_IPADDRESS) == TMPL_MASK_IPADDRESS) {
1748                 /* Both are */
1749                 out |= YAFREC_IP_BOTH;
1750             } else if (bmap & TMPL_BIT_sourceIPv6Address) {
1751                 out |= YAFREC_ONLY_IP6;
1752             } else {
1753                 assert(bmap & TMPL_BIT_sourceIPv4Address);
1754                 out |= YAFREC_ONLY_IP4;
1755             }
1756 
1757             /* Are TCP flags available without visiting the STML? */
1758             if ((0 == (bmap & TMPL_BIT_initialTCPFlags))
1759                 && (bmap & TMPL_BIT_subTemplateMultiList))
1760             {
1761                 out |= YAFREC_STML;
1762             }
1763             /* Is it a uniflow or a bi flow? */
1764             if (bmap & TMPL_BIT_reverseFlowDeltaMilliseconds) {
1765                 out |= YAFREC_BI;
1766             } else {
1767                 out |= YAFREC_UNI;
1768             }
1769             out |= (SKI_YAFREC_TID
1770                     | (bmap & (TMPL_BIT_reverseVlanId |
1771                                TMPL_BIT_reverseTcpControlBits |
1772                                TMPL_BIT_reverseInitialTCPFlags |
1773                                TMPL_BIT_icmpTypeCodeIPv4)));
1774             BMAP_TMPL_CTX_SET(ctx, ctx_free_fn, out);
1775             TMPL_PROC_MSG("YAF");
1776 
1777         /* check whether the template may be processed by the NetFlow
1778          * v9 template by not having any IEs outside of that set */
1779         } else if ((0 == (bmap & ~TMPL_MASK_NF9REC))
1780                    && (bmap & TMPL_MASK_IPADDRESS))
1781         {
1782             /* this do{}while(0) is not a loop, it is just something that
1783              * "break;" works with */
1784             do {
1785                 /* Which IP addresses are present? */
1786                 if ((bmap &  TMPL_MASK_IPADDRESS)==TMPL_BIT_sourceIPv6Address){
1787                     out |= NF9REC_IP6;
1788                 } else if ((bmap &  TMPL_MASK_IPADDRESS)
1789                            == TMPL_BIT_sourceIPv4Address)
1790                 {
1791                     out |= NF9REC_IP4;
1792                 } else {
1793                     /* cannot use the nf9rec template */
1794                     out = 0;
1795                     break;
1796                 }
1797 
1798                 /* Which time is present */
1799                 if ((bmap & TMPL_MASK_TIME_NF9)== TMPL_MASK_TIME_SYSUP) {
1800                     out |= NF9REC_SYSUP;
1801                 } else if (((bmap & TMPL_MASK_TIME_NF9)
1802                             == TMPL_MASK_TIME_MILLI_NF9)
1803                            || ((bmap & TMPL_MASK_TIME_NF9)
1804                                == TMPL_BIT_observationTimeMilliseconds))
1805                 {
1806                     out |= NF9REC_MILLI;
1807                 } else {
1808                     /* cannot use the nf9rec template */
1809                     out = 0;
1810                     break;
1811                 }
1812 
1813                 /* Which volume is present */
1814                 if (((bmap & TMPL_MASK_VOLUME_NF9)
1815                      & (TMPL_BIT_initiatorOctets
1816                         | TMPL_BIT_responderOctets))
1817                     && 0 == ((bmap & TMPL_MASK_VOLUME_NF9)
1818                              & ~(TMPL_BIT_initiatorOctets
1819                                  | TMPL_BIT_responderOctets)))
1820                 {
1821                     out |= NF9REC_INITIATOR;
1822                 } else if (((bmap & TMPL_MASK_VOLUME_NF9)
1823                             & (TMPL_BIT_octetDeltaCount
1824                                | TMPL_BIT_packetDeltaCount
1825                                | TMPL_BIT_postOctetDeltaCount))
1826                            && 0 == ((bmap & TMPL_MASK_VOLUME_NF9)
1827                                     & ~(TMPL_BIT_octetDeltaCount
1828                                         | TMPL_BIT_packetDeltaCount
1829                                         | TMPL_BIT_postOctetDeltaCount)))
1830                 {
1831                     out |= NF9REC_DELTA;
1832                 } else if (((bmap & TMPL_MASK_VOLUME_NF9)
1833                             & (TMPL_BIT_octetTotalCount
1834                                | TMPL_BIT_packetTotalCount
1835                                | TMPL_BIT_postOctetTotalCount))
1836                            && 0 == ((bmap & TMPL_MASK_VOLUME_NF9)
1837                                     & ~(TMPL_BIT_octetTotalCount
1838                                         | TMPL_BIT_packetTotalCount
1839                                         | TMPL_BIT_postOctetTotalCount)))
1840                 {
1841                     out |= NF9REC_TOTAL;
1842                 } else if (((bmap & TMPL_MASK_VOLUME_NF9) == 0)
1843                            && (bmap & (TMPL_BIT_firewallEvent
1844                                        | TMPL_BIT_NF_F_FW_EVENT)))
1845                 {
1846                     out |= NF9REC_INITIATOR;
1847                 } else {
1848                     /* cannot use the nf9rec template */
1849                     out = 0;
1850                     break;
1851                 }
1852 
1853                 out |= (SKI_NF9REC_TID
1854                         | (bmap & (TMPL_BIT_icmpTypeCodeIPv4 |
1855                                    TMPL_BIT_icmpTypeIPv4 |
1856                                    TMPL_BIT_firewallEvent |
1857                                    TMPL_BIT_NF_F_FW_EVENT |
1858                                    TMPL_BIT_NF_F_FW_EXT_EVENT)));
1859                 BMAP_TMPL_CTX_SET(ctx, ctx_free_fn, out);
1860                 TMPL_PROC_MSG("NFv9");
1861             } while (0);
1862         }
1863 
1864         if (*ctx != NULL || known_id != 0) {
1865             /* template is already handled */
1866         } else if (bmap) {
1867             out = 1 | (BMAP_TYPE)bmap;
1868             BMAP_TMPL_CTX_SET(ctx, ctx_free_fn, out);
1869             TMPL_PROC_MSG("generic");
1870         } else {
1871             TMPL_PROC_MSG("ignore");
1872         }
1873     }
1874 
1875 #if TRACEMSG_LEVEL >= 2
1876     if (*ctx) {
1877         TRACEMSG(2, ("%s bmap " BMAP_PRI ", written", prefix, out));
1878     }
1879 #endif
1880 
1881     if (show_tmpl) {
1882         char buf[0x4000];
1883         char *b = buf;
1884         ssize_t rem = sizeof(buf);
1885         ssize_t sz;
1886 
1887         *b = '\0';
1888         for (i = 0;
1889              i < count && (ie = fbTemplateGetIndexedIE(tmpl, i)) && rem > 2;
1890              ++i)
1891         {
1892             assert(b < (buf + sizeof(buf)));
1893             if (0 == ie->ent) {
1894                 sz = snprintf(b, rem, "%s %s(%u)[%u]%s",
1895                               ((i > 0) ? "," : ""),
1896                               ie->ref.canon->ref.name, ie->num, ie->len,
1897                              ((i < scope) ? "{scope}" : ""));
1898             } else {
1899                 sz = snprintf(b, rem, "%s %s(%u/%u)[%u]%s",
1900                               ((i > 0) ? "," : ""),
1901                               ie->ref.canon->ref.name, ie->ent, ie->num,
1902                               ie->len, ((i < scope) ? "{scope}" : ""));
1903             }
1904             rem -= sz;
1905             b += sz;
1906         }
1907         INFOMSG(("'%s': Contents of %stemplate %#06x (%u),"
1908                  " domain %#x, %" PRIu32 " elements:%s"),
1909                 name, ((scope) ? "options " : ""), tid, tid,
1910                 domain, count, buf);
1911     }
1912 }
1913 
1914 
1915 /*
1916  *    Initialize an fbSession object that reads from either the
1917  *    network or from a file.
1918  *
1919  *    This function updates the fbSession object with (1) the
1920  *    received-new-template callback function and (2) all the
1921  *    templates used when transcoding the incoming data.
1922  */
1923 int
skiSessionInitReader(fbSession_t * session,GError ** err)1924 skiSessionInitReader(
1925     fbSession_t        *session,
1926     GError            **err)
1927 {
1928     fbInfoModel_t   *model;
1929     fbTemplate_t    *tmpl;
1930     const uint32_t yafrec_spec_flags[] = {
1931         /* exported by YAF, either biflow or uniflow, either IPv4 or
1932          * IPv6 (never both), either delta count or total count, with
1933          * or without an STML */
1934         YAFREC_BI  | YAFREC_ONLY_IP4 | YAFREC_DELTA | YAFREC_STML,
1935         YAFREC_BI  | YAFREC_ONLY_IP4 | YAFREC_TOTAL | YAFREC_STML,
1936         YAFREC_BI  | YAFREC_ONLY_IP6 | YAFREC_DELTA | YAFREC_STML,
1937         YAFREC_BI  | YAFREC_ONLY_IP6 | YAFREC_TOTAL | YAFREC_STML,
1938 
1939         YAFREC_UNI | YAFREC_ONLY_IP4 | YAFREC_DELTA | YAFREC_STML,
1940         YAFREC_UNI | YAFREC_ONLY_IP4 | YAFREC_TOTAL | YAFREC_STML,
1941         YAFREC_UNI | YAFREC_ONLY_IP6 | YAFREC_DELTA | YAFREC_STML,
1942         YAFREC_UNI | YAFREC_ONLY_IP6 | YAFREC_TOTAL | YAFREC_STML,
1943 
1944         YAFREC_BI  | YAFREC_ONLY_IP4 | YAFREC_DELTA,
1945         YAFREC_BI  | YAFREC_ONLY_IP4 | YAFREC_TOTAL,
1946         YAFREC_BI  | YAFREC_ONLY_IP6 | YAFREC_DELTA,
1947         YAFREC_BI  | YAFREC_ONLY_IP6 | YAFREC_TOTAL,
1948 
1949         YAFREC_UNI | YAFREC_ONLY_IP4 | YAFREC_DELTA,
1950         YAFREC_UNI | YAFREC_ONLY_IP4 | YAFREC_TOTAL,
1951         YAFREC_UNI | YAFREC_ONLY_IP6 | YAFREC_DELTA,
1952         YAFREC_UNI | YAFREC_ONLY_IP6 | YAFREC_TOTAL,
1953 
1954         /* created by SiLK; always uniflow, never with an STML, always
1955          * delta count; may have both IPv4 and IPv6 */
1956         YAFREC_UNI | YAFREC_DELTA | YAFREC_IP_BOTH,
1957         0                       /* sentinal */
1958     };
1959     const uint32_t nf9rec_spec_flags[] = {
1960         NF9REC_IP4 | NF9REC_SYSUP | NF9REC_DELTA,
1961         NF9REC_IP4 | NF9REC_MILLI | NF9REC_DELTA,
1962         NF9REC_IP6 | NF9REC_SYSUP | NF9REC_DELTA,
1963         NF9REC_IP6 | NF9REC_MILLI | NF9REC_DELTA,
1964 
1965         NF9REC_IP4 | NF9REC_SYSUP | NF9REC_TOTAL,
1966         NF9REC_IP4 | NF9REC_MILLI | NF9REC_TOTAL,
1967         NF9REC_IP6 | NF9REC_SYSUP | NF9REC_TOTAL,
1968         NF9REC_IP6 | NF9REC_MILLI | NF9REC_TOTAL,
1969 
1970         NF9REC_IP4 | NF9REC_SYSUP | NF9REC_INITIATOR,
1971         NF9REC_IP4 | NF9REC_MILLI | NF9REC_INITIATOR,
1972         NF9REC_IP6 | NF9REC_SYSUP | NF9REC_INITIATOR,
1973         NF9REC_IP6 | NF9REC_MILLI | NF9REC_INITIATOR,
1974 
1975         0                       /* sentinal */
1976     };
1977     uint32_t i;
1978     uint16_t tid;
1979 
1980 
1981     /* assert that we are not replacing an existing template */
1982 #ifdef  NDEBUG
1983 #define ASSERT_NO_TMPL(m_session, m_tid, m_err)
1984 #else
1985 #define ASSERT_NO_TMPL(m_session, m_tid, m_err)                         \
1986     do {                                                                \
1987         assert(fbSessionGetTemplate(m_session, TRUE, (m_tid), m_err)    \
1988                == NULL);                                                \
1989         assert(g_error_matches(*(m_err), FB_ERROR_DOMAIN, FB_ERROR_TMPL)); \
1990         g_clear_error(m_err);                                           \
1991     } while(0)
1992 #endif  /* #else of #ifdef NDEBUG */
1993 
1994     model = fbSessionGetInfoModel(session);
1995 
1996     /* Add the "Give me everything" record template */
1997     tmpl = fbTemplateAlloc(model);
1998     if (!fbTemplateAppendSpecArray(tmpl, ski_fixrec_spec, sampler_flags, err)){
1999         goto ERROR;
2000     }
2001     ASSERT_NO_TMPL(session, SKI_FIXREC_TID, err);
2002     if (!fbSessionAddTemplate(session, TRUE, SKI_FIXREC_TID, tmpl, err)) {
2003         goto ERROR;
2004     }
2005 
2006     /* Add the TCP record template */
2007     tmpl = fbTemplateAlloc(model);
2008     if (!fbTemplateAppendSpecArray(tmpl, ski_tcp_stml_spec, 0, err)) {
2009         goto ERROR;
2010     }
2011     ASSERT_NO_TMPL(session, SKI_TCP_STML_TID, err);
2012     if (!fbSessionAddTemplate(session, TRUE, SKI_TCP_STML_TID, tmpl, err)) {
2013         goto ERROR;
2014     }
2015 
2016     /* Add the yaf stats record template  */
2017     tmpl = fbTemplateAlloc(model);
2018     if (!fbTemplateAppendSpecArray(tmpl, ski_yafstats_spec, 0, err)) {
2019         goto ERROR;
2020     }
2021     ASSERT_NO_TMPL(session, SKI_YAFSTATS_TID, err);
2022     if (!fbSessionAddTemplate(session, TRUE, SKI_YAFSTATS_TID, tmpl, err)) {
2023         goto ERROR;
2024     }
2025 
2026     /* Add the yaf tombstone record template  */
2027     tmpl = fbTemplateAlloc(model);
2028     if (!fbTemplateAppendSpecArray(tmpl, ski_tombstone_spec, 0, err)) {
2029         goto ERROR;
2030     }
2031     ASSERT_NO_TMPL(session, SKI_TOMBSTONE_TID, err);
2032     if (!fbSessionAddTemplate(session, TRUE, SKI_TOMBSTONE_TID, tmpl, err)) {
2033         goto ERROR;
2034     }
2035 
2036     /* Add the yaf tombstone record template  */
2037     tmpl = fbTemplateAlloc(model);
2038     if (!fbTemplateAppendSpecArray(tmpl, ski_tombstone_access_spec, 0, err)) {
2039         goto ERROR;
2040     }
2041     ASSERT_NO_TMPL(session, SKI_TOMBSTONE_ACCESS_TID, err);
2042     if (!fbSessionAddTemplate(
2043             session, TRUE, SKI_TOMBSTONE_ACCESS_TID, tmpl, err)) {
2044         goto ERROR;
2045     }
2046 
2047     /* Add the netflow v9 sampling options template  */
2048     tmpl = fbTemplateAlloc(model);
2049     if (!fbTemplateAppendSpecArray(
2050             tmpl, ski_nf9sampling_spec, sampler_flags, err))
2051     {
2052         goto ERROR;
2053     }
2054     ASSERT_NO_TMPL(session, SKI_NF9SAMPLING_TID,err);
2055     if (!fbSessionAddTemplate(session, TRUE, SKI_NF9SAMPLING_TID, tmpl, err)){
2056         goto ERROR;
2057     }
2058 
2059     /* Add the "do nothing/ignore record" template  */
2060     tmpl = fbTemplateAlloc(model);
2061     if (!fbTemplateAppendSpecArray(tmpl, ski_ignore_spec, 0, err)) {
2062         goto ERROR;
2063     }
2064     ASSERT_NO_TMPL(session, SKI_IGNORE_TID, err);
2065     if (!fbSessionAddTemplate(session, TRUE, SKI_IGNORE_TID, tmpl, err)) {
2066         goto ERROR;
2067     }
2068 
2069     /* Add the various ski_yafrec_spec templates */
2070     for (i = 0; yafrec_spec_flags[i] != 0; ++i) {
2071         tmpl = fbTemplateAlloc(model);
2072         if (!fbTemplateAppendSpecArray(
2073                 tmpl, ski_yafrec_spec, yafrec_spec_flags[i], err))
2074         {
2075             goto ERROR;
2076         }
2077         tid = (SKI_YAFREC_TID | yafrec_spec_flags[i]);
2078         ASSERT_NO_TMPL(session, tid, err);
2079         if (!fbSessionAddTemplate(session, TRUE, tid, tmpl, err)) {
2080             goto ERROR;
2081         }
2082     }
2083 
2084     /* Add the various ski_nf9rec_spec templates */
2085     for (i = 0; nf9rec_spec_flags[i] != 0; ++i) {
2086         tmpl = fbTemplateAlloc(model);
2087         if (!fbTemplateAppendSpecArray(
2088                 tmpl, ski_nf9rec_spec, nf9rec_spec_flags[i], err))
2089         {
2090             goto ERROR;
2091         }
2092         tid = (SKI_NF9REC_TID | nf9rec_spec_flags[i]);
2093         ASSERT_NO_TMPL(session, tid, err);
2094         if (!fbSessionAddTemplate(session, TRUE, tid, tmpl, err)) {
2095             goto ERROR;
2096         }
2097     }
2098 
2099     /* Invoke the function above when a new template arrives. */
2100 #if FIXBUF_CHECK_VERSION(2,0,0)
2101     fbSessionAddNewTemplateCallback(session, &skiTemplateCallbackCtx, NULL);
2102 #else
2103     fbSessionAddTemplateCtxCallback2(session, &skiTemplateCallbackCtx, NULL);
2104 #endif
2105 
2106     TRACE_RETURN(1);
2107 
2108   ERROR:
2109     fbTemplateFreeUnused(tmpl);
2110     TRACE_RETURN(0);
2111 }
2112 
2113 
2114 /* **************************************************************
2115  * *****  Support for reading/import
2116  */
2117 
2118 /**
2119  *    Use the external template of the next record to determine its
2120  *    type.
2121  *
2122  *    Fill the 'tmpl' member of 'record' with the incoming template
2123  *    for the next record, fill the 'bmap' member of 'record' with the
2124  *    bitmap settings for that template, fill the 'rectype' member of
2125  *    'record' with the next record's type, and return that type.
2126  */
2127 static ski_rectype_t
ski_rectype_next(fBuf_t * fbuf,ski_record_t * record,GError ** err)2128 ski_rectype_next(
2129     fBuf_t             *fbuf,
2130     ski_record_t       *record,
2131     GError            **err)
2132 {
2133     record->tmpl = fBufNextCollectionTemplate(fbuf, &record->tid, err);
2134     if (record->tmpl == NULL) {
2135         return (record->rectype = SKI_RECTYPE_ERROR);
2136     }
2137     record->bmap = BMAP_TMPL_CTX_GET(record->tmpl);
2138 
2139     /* Handle Records that use an Options Template */
2140     if (fbTemplateGetOptionsScope(record->tmpl)) {
2141         if (record->bmap & (TMPL_BIT_flowTableFlushEventCount
2142                             | TMPL_BIT_flowTablePeakCount))
2143         {
2144             return (record->rectype = SKI_RECTYPE_YAFSTATS);
2145         }
2146         if (record->bmap & TMPL_BIT_tombstoneId) {
2147             return (record->rectype = SKI_RECTYPE_TOMBSTONE);
2148         }
2149         if (record->bmap
2150             & (TMPL_BIT_samplingAlgorithm | TMPL_BIT_samplerMode))
2151         {
2152             return (record->rectype = SKI_RECTYPE_NF9SAMPLING);
2153         }
2154         return (record->rectype = SKI_RECTYPE_IGNORE);
2155     }
2156 
2157     switch (record->bmap & BMAP_RECTYPE_MASK) {
2158       case 4:
2159         return (record->rectype = SKI_RECTYPE_YAFREC);
2160       case 2: case 6:
2161         return (record->rectype = SKI_RECTYPE_NF9REC);
2162       case 1: case 3: case 5: case 7:
2163         return (record->rectype = SKI_RECTYPE_FIXREC);
2164       case 0:
2165         break;
2166       default:
2167         skAbortBadCase(record->bmap & BMAP_RECTYPE_MASK);
2168     }
2169     return (record->rectype = SKI_RECTYPE_IGNORE);
2170 }
2171 
2172 
2173 /**
2174  *    Call fBufNext() and transcode the data into the
2175  *    ski_yafstats_spec template.  Return 1 on success or 0 on
2176  *    failure.
2177  */
2178 static int
ski_yafstats_next(fBuf_t * fbuf,ski_record_t * record,const skpc_probe_t * probe,GError ** err)2179 ski_yafstats_next(
2180     fBuf_t                 *fbuf,
2181     ski_record_t           *record,
2182     const skpc_probe_t     *probe,
2183     GError                **err)
2184 {
2185     size_t len;
2186     DEFINE_PREFIX_BUF(prefix);
2187 
2188     SK_UNUSED_PARAM(probe);
2189     makeTracemsgPrefix(prefix, sizeof(prefix), skpcProbeGetName(probe),
2190                        fbSessionGetDomain(fBufGetSession(fbuf)),
2191                        record->tid, record->tmpl);
2192     TRACEMSG(2, (("%s bmap " BMAP_PRI ", read by ski_yafstats_next()"),
2193                  prefix, record->bmap));
2194     assert(SKI_RECTYPE_YAFSTATS == record->rectype);
2195 
2196     /* Set internal template to read a yaf stats record */
2197     if (!fBufSetInternalTemplate(fbuf, SKI_YAFSTATS_TID, err)) {
2198         return FALSE;
2199     }
2200 
2201     len = sizeof(record->data.yafstats);
2202     return fBufNext(fbuf, (uint8_t *)&record->data.yafstats, &len, err);
2203 }
2204 
2205 
2206 /**
2207  *    Update the statistics on 'source' by, for each field, adding to
2208  *    'source' the difference of the value the field in 'record' from
2209  *    the value of the field in 'previous'.  Finally, copy the values
2210  *    from 'current' into 'previous'.
2211  */
2212 static void
ski_yafstats_update_source(skIPFIXSource_t * source,const ski_record_t * record,ski_yafstats_t * previous)2213 ski_yafstats_update_source(
2214     skIPFIXSource_t    *source,
2215     const ski_record_t *record,
2216     ski_yafstats_t     *previous)
2217 {
2218     const ski_yafstats_t *current;
2219 
2220     assert(SKI_RECTYPE_YAFSTATS == record->rectype);
2221 
2222     current = &record->data.yafstats;
2223 
2224     DEBUGMSG("'%s': Got a yaf stats record", source->name);
2225     TRACEMSG(1, (("'%s': "
2226                  "inittime %" PRIu64
2227                  ", dropped %" PRIu64
2228                  ", ignored %" PRIu64
2229                  ", notsent %" PRIu64
2230                  ", expired %" PRIu32
2231                  ", pkttotal %" PRIu64
2232                  ", exported %" PRIu64),
2233                  source->name,
2234                  current->systemInitTimeMilliseconds,
2235                  current->droppedPacketTotalCount,
2236                  current->ignoredPacketTotalCount,
2237                  current->notSentPacketTotalCount,
2238                  current->expiredFragmentCount,
2239                  current->packetTotalCount,
2240                  current->exportedFlowRecordTotalCount));
2241 
2242     if (current->systemInitTimeMilliseconds
2243         != previous->systemInitTimeMilliseconds)
2244     {
2245         memset(previous, 0, sizeof(*previous));
2246     }
2247 
2248     pthread_mutex_lock(&source->stats_mutex);                           \
2249     source->saw_yafstats_pkt = 1;                                      \
2250     source->yaf_dropped_packets += (current->droppedPacketTotalCount
2251                                     - previous->droppedPacketTotalCount);
2252     source->yaf_ignored_packets += (current->ignoredPacketTotalCount
2253                                     - previous->ignoredPacketTotalCount);
2254     source->yaf_notsent_packets += (current->notSentPacketTotalCount
2255                                     - previous->notSentPacketTotalCount);
2256     source->yaf_expired_fragments += (current->expiredFragmentCount
2257                                       - previous->expiredFragmentCount);
2258     source->yaf_processed_packets += (current->packetTotalCount
2259                                       - previous->packetTotalCount);
2260     source->yaf_exported_flows += (current->exportedFlowRecordTotalCount
2261                                    - previous->exportedFlowRecordTotalCount);
2262     pthread_mutex_unlock(&source->stats_mutex);
2263     *previous = *current;
2264 }
2265 
2266 
2267 #if SKIPFIX_ENABLE_TOMBSTONE_TIMES
2268 /**
2269  *    Add the access time 'seconds' for tool 'tool_id' to 'buffer'.
2270  */
2271 static ssize_t
ski_tombstone_add_access(char * buffer,size_t length,uint32_t seconds,uint32_t tool_id)2272 ski_tombstone_add_access(
2273     char               *buffer,
2274     size_t              length,
2275     uint32_t            seconds,
2276     uint32_t            tool_id)
2277 {
2278     static const char *tool[] = {
2279         "unknown(0)", "yaf", "super_mediator", "rwflowpack", "rwflowappend",
2280         "mothra-packer", "pipeline"
2281     };
2282     char stime_buf[SKTIMESTAMP_STRLEN];
2283 
2284     sktimestamp_r(stime_buf, sktimeCreate(seconds, 0),
2285                   SKTIMESTAMP_UTC | SKTIMESTAMP_NOMSEC);
2286     if (tool_id < sizeof(tool)/sizeof(tool[0])) {
2287         return snprintf(buffer, length, "; process: %s, time: %sZ",
2288                         tool[tool_id], stime_buf);
2289     }
2290     return snprintf(buffer,length,"; process: unknown(%" PRIu32 "), time: %sZ",
2291                     tool_id, stime_buf);
2292 }
2293 #endif  /* SKIPFIX_ENABLE_TOMBSTONE_TIMES */
2294 
2295 /**
2296  *    Read a YAF Options Record containing the tombstone counter and
2297  *    print a log message.
2298  */
2299 static gboolean
ski_tombstone_next(fBuf_t * fbuf,ski_record_t * record,const skpc_probe_t * probe,GError ** err)2300 ski_tombstone_next(
2301     fBuf_t                 *fbuf,
2302     ski_record_t           *record,
2303     const skpc_probe_t     *probe,
2304     GError                **err)
2305 {
2306 #if SKIPFIX_ENABLE_TOMBSTONE_TIMES
2307     const ski_tombstone_access_t *ts_access;
2308     void *stl;
2309 #endif
2310     const ski_tombstone_t *ts;
2311     char buf[1024];
2312     char *b;
2313     size_t len;
2314     ssize_t sz;
2315 
2316     makeTracemsgPrefix(buf, sizeof(buf), skpcProbeGetName(probe),
2317                        fbSessionGetDomain(fBufGetSession(fbuf)),
2318                        record->tid, record->tmpl);
2319     TRACEMSG(2, (("%s bmap " BMAP_PRI ", read by ski_tombstone_next()"),
2320                  buf, record->bmap));
2321     assert(SKI_RECTYPE_TOMBSTONE == record->rectype);
2322 
2323     /* Set internal template to read the options record */
2324     if (!fBufSetInternalTemplate(fbuf, SKI_TOMBSTONE_TID, err)) {
2325         return FALSE;
2326     }
2327 #if SKIPFIX_ENABLE_TOMBSTONE_TIMES
2328     fbSubTemplateListCollectorInit(&record->data.tombstone.stl);
2329 #if FIXBUF_CHECK_VERSION(2,3,0)
2330     fbSubTemplateListCollectorInit(&record->data.tombstone.tombstoneAccessList);
2331 #endif
2332 #endif  /* SKIPFIX_ENABLE_TOMBSTONE_TIMES */
2333 
2334     len = sizeof(record->data.tombstone);
2335     if (!fBufNext(fbuf, (uint8_t *)&record->data.tombstone, &len, err)) {
2336         return FALSE;
2337     }
2338     assert(len == sizeof(ski_tombstone_t));
2339     ts = &record->data.tombstone;
2340 
2341     len = sizeof(buf);
2342     b = buf;
2343 
2344     if (ts->stl.numElements) {
2345         /* This tombstone record pre-dates YAF 2.11 */
2346         assert(0 == ts->exportingProcessId);
2347         sz = snprintf(b, len, ("'%s': Received tombstone record:"
2348                                " exporterId: %u:%u, tombstoneId: %u"),
2349                       skpcProbeGetName(probe), ts->exporterConfiguredId,
2350                       ts->exporterUniqueId, ts->tombstoneId);
2351         if (len < (size_t)sz) {
2352             goto WRITEMSG;
2353         }
2354         b += sz;
2355         len -= sz;
2356 
2357 #if SKIPFIX_ENABLE_TOMBSTONE_TIMES
2358         stl = NULL;
2359         while ((stl = fbSubTemplateListGetNextPtr(&ts->stl, stl))) {
2360             ts_access = (ski_tombstone_access_t *)stl;
2361             sz = ski_tombstone_add_access(b, len,
2362                                           ts_access->observationTimeSeconds,
2363                                           ts_access->exportingProcessId);
2364             if (len < (size_t)sz) {
2365                 goto WRITEMSG;
2366             }
2367             b += sz;
2368             len -= sz;
2369         }
2370 #endif  /* SKIPFIX_ENABLE_TOMBSTONE_TIMES */
2371     } else {
2372         /* This tombstone record is from YAF 2.11 or later */
2373         assert(0 == ts->exporterUniqueId);
2374         sz = snprintf(b, len, ("'%s': Received Tombstone record:"
2375                                " observationDomain:%u,"
2376                                " exporterId:%u:%u, tombstoneId: %u"),
2377                       skpcProbeGetName(probe), ts->observationDomainId,
2378                       ts->exporterConfiguredId, ts->exportingProcessId,
2379                       ts->tombstoneId);
2380         if (len < (size_t)sz) {
2381             goto WRITEMSG;
2382         }
2383         b += sz;
2384         len -= sz;
2385 
2386 #if SKIPFIX_ENABLE_TOMBSTONE_TIMES && FIXBUF_CHECK_VERSION(2,3,0)
2387         stl = NULL;
2388         while ((stl = fbSubTemplateListGetNextPtr(&ts->tombstoneAccessList,
2389                                                   stl)))
2390         {
2391             ts_access = (ski_tombstone_access_t *)stl;
2392             sz = ski_tombstone_add_access(b, len,
2393                                           ts_access->observationTimeSeconds,
2394                                           ts_access->certToolId);
2395             if (len < (size_t)sz) {
2396                 goto WRITEMSG;
2397             }
2398             b += sz;
2399             len -= sz;
2400         }
2401 #endif  /* SKIPFIX_ENABLE_TOMBSTONE_TIMES */
2402     }
2403 
2404   WRITEMSG:
2405     buf[sizeof(buf)-1] = '\0';
2406     DEBUGMSG("%s", buf);
2407 
2408 #if SKIPFIX_ENABLE_TOMBSTONE_TIMES
2409     fbSubTemplateListClear((fbSubTemplateList_t *)&ts->stl);
2410 #if FIXBUF_CHECK_VERSION(2,3,0)
2411     fbSubTemplateListClear((fbSubTemplateList_t *)&ts->tombstoneAccessList);
2412 #endif
2413 #endif  /* SKIPFIX_ENABLE_TOMBSTONE_TIMES */
2414 
2415     return TRUE;
2416 }
2417 
2418 
2419 /* Determine which names are used for certain elements in the
2420  * information model. */
2421 void
ski_nf9sampling_check_spec(void)2422 ski_nf9sampling_check_spec(
2423     void)
2424 {
2425     fbInfoModel_t *model;
2426     const fbInfoElementSpec_t *spec;
2427     uint32_t flags;
2428 
2429     model = skiInfoModel();
2430     flags = 0;
2431 
2432     for (spec = ski_nf9sampling_spec; spec->name; ++spec) {
2433         if (0 == spec->flags) {
2434             assert(fbInfoModelGetElementByName(model, spec->name));
2435         }
2436         else if (fbInfoModelGetElementByName(model, spec->name)) {
2437             if (0 == flags) {
2438                 flags = spec->flags;
2439             } else if (spec->flags != flags) {
2440                 skAppPrintErr("Info Element '%s' is in model; flags = %u",
2441                               spec->name, flags);
2442                 skAbort();
2443             }
2444         } else if (flags && spec->flags == flags) {
2445             skAppPrintErr("Info Element '%s' not in model; flags = %u",
2446                           spec->name, flags);
2447             skAbort();
2448         }
2449     }
2450 
2451     sampler_flags = flags;
2452 
2453     skiInfoModelFree();
2454 }
2455 
2456 
2457 /**
2458  *    Read a NetFlowV9 Options Record regarding the flow sampling rate
2459  *    and write a message to the log file.
2460  */
2461 static gboolean
ski_nf9sampling_next(fBuf_t * fbuf,ski_record_t * record,const skpc_probe_t * probe,GError ** err)2462 ski_nf9sampling_next(
2463     fBuf_t                 *fbuf,
2464     ski_record_t           *record,
2465     const skpc_probe_t     *probe,
2466     GError                **err)
2467 {
2468     size_t len;
2469 
2470     DEFINE_PREFIX_BUF(prefix);
2471     makeTracemsgPrefix(prefix, sizeof(prefix), skpcProbeGetName(probe),
2472                        fbSessionGetDomain(fBufGetSession(fbuf)),
2473                        record->tid, record->tmpl);
2474     TRACEMSG(2, (("%s bmap " BMAP_PRI ", read by ski_nf9sampling_next()"),
2475                  prefix, record->bmap));
2476     assert(SKI_RECTYPE_NF9SAMPLING == record->rectype);
2477 
2478     /* Set internal template to read the options record */
2479     if (!fBufSetInternalTemplate(fbuf, SKI_NF9SAMPLING_TID, err)) {
2480         return FALSE;
2481     }
2482 
2483     len = sizeof(record->data.nf9sampling);
2484     if (!fBufNext(fbuf, (uint8_t *)&record->data.nf9sampling, &len, err)) {
2485         return FALSE;
2486     }
2487     assert(len == sizeof(ski_nf9sampling_t));
2488 
2489     if (skpcProbeGetLogFlags(probe) & SOURCE_LOG_SAMPLING) {
2490         if (record->bmap & TMPL_BIT_samplingAlgorithm) {
2491             INFOMSG("'%s': Sampling Algorithm %u; Sampling Interval %u",
2492                     skpcProbeGetName(probe),
2493                     record->data.nf9sampling.samplingAlgorithm,
2494                     record->data.nf9sampling.samplingInterval);
2495         } else if (record->bmap & TMPL_BIT_samplerMode) {
2496             INFOMSG(("'%s': Flow Sampler Id %u; Flow Sampler Mode %u;"
2497                      " Flow Sampler Random Interval %u"),
2498                     skpcProbeGetName(probe),
2499                     record->data.nf9sampling.samplerId,
2500                     record->data.nf9sampling.samplerMode,
2501                     record->data.nf9sampling.samplerRandomInterval);
2502         }
2503     }
2504     return TRUE;
2505 }
2506 
2507 
2508 /**
2509  *    Call fBufNext() and transcode the data into the ski_ignore_spec
2510  *    template.  Return 1 on success or 0 on failure.
2511  */
2512 static int
ski_ignore_next(fBuf_t * fbuf,ski_record_t * record,const skpc_probe_t * probe,GError ** err)2513 ski_ignore_next(
2514     fBuf_t                 *fbuf,
2515     ski_record_t           *record,
2516     const skpc_probe_t     *probe,
2517     GError                **err)
2518 {
2519     size_t len;
2520     DEFINE_PREFIX_BUF(prefix);
2521 
2522     SK_UNUSED_PARAM(probe);
2523     makeTracemsgPrefix(prefix, sizeof(prefix), skpcProbeGetName(probe),
2524                        fbSessionGetDomain(fBufGetSession(fbuf)),
2525                        record->tid, record->tmpl);
2526     TRACEMSG(2, (("%s bmap " BMAP_PRI ", read by ski_ignore_next()"),
2527                  prefix, record->bmap));
2528 
2529     if (!fBufSetInternalTemplate(fbuf, SKI_IGNORE_TID, err)) {
2530         return FALSE;
2531     }
2532 
2533     len = sizeof(record->data.ignore);
2534     return fBufNext(fbuf, (uint8_t *)&record->data.ignore, &len, err);
2535 }
2536 
2537 
2538 /*
2539  *    Convert the NTP timestamp (RFC1305) contained in 'ntp' to epoch
2540  *    milliseconds.  The 'is_micro' field should be 0 if the function
2541  *    is decoding dateTimeNanoseconds and non-zero when decoding
2542  *    dateTimeMicroseconds.
2543  *
2544  *    An NTP timestamp is a 64 bit value that has whole seconds in the
2545  *    upper 32 bits and fractional seconds in the lower 32 bits.  Each
2546  *    fractional second represents 1/(2^32)th of a second.
2547  *
2548  *    In addition, NTP uses an epoch time of Jan 1, 1900.
2549  *
2550  *    When the 'is_micro' flag is set, decoding must ignore the 11
2551  *    lowest bits of the fractional part of the timestamp.
2552  *
2553  *    If 'ntp' is 0, assume the element was not in the model and
2554  *    return 0.
2555  */
2556 static uint64_t
skiNTPDecode(uint64_t ntp,int is_micro)2557 skiNTPDecode(
2558     uint64_t            ntp,
2559     int                 is_micro)
2560 {
2561     /* the UNIX epoch as a number of seconds since NTP epoch */
2562 #define JAN_1970  UINT64_C(0x83AA7E80)
2563 
2564     double frac;
2565     uint64_t t;
2566 
2567     if (!ntp) {
2568         return 0;
2569     }
2570     /* handle fractional seconds; convert to milliseconds */
2571     frac = (1000.0 * (ntp & (is_micro ? UINT32_C(0xFFFFF800) : UINT32_MAX))
2572             / (double)UINT64_C(0x100000000));
2573 
2574     /* handle whole seconds, convert to milliseconds */
2575     t = ((ntp >> 32) - JAN_1970) * 1000;
2576 
2577     return t + (uint64_t)frac;
2578 }
2579 
2580 
2581 /*
2582  *    Run the Gauntlet of Time.
2583  *
2584  *    Set the start-time and duration of the 'fwd_rec' member of
2585  *    'record' by checking the 'fixrec' member for the dozen or so
2586  *    time fields that IPFIX provides.
2587  */
2588 static void
skiGauntletOfTime(fBuf_t * fbuf,ski_record_t * record,const skpc_probe_t * probe)2589 skiGauntletOfTime(
2590     fBuf_t                 *fbuf,
2591     ski_record_t           *record,
2592     const skpc_probe_t     *probe)
2593 {
2594     struct log_rec_time_st {
2595         /* "raw" start time from the record */
2596         uint64_t        start_val;
2597         /* name of the IE in the 'start_val' member, NULL if none */
2598         const char     *start_name;
2599         /* "raw" end time from the record */
2600         uint64_t        end_val;
2601         /* name of the IE in the 'end_val' member, NULL if none */
2602         const char     *end_name;
2603         /* "raw" duration time from the record */
2604         uint64_t        dur_val;
2605         /* name of the IE in the 'dur_val' member, NULL if none */
2606         const char     *dur_name;
2607     } log_rec_time = {
2608         0, NULL, 0, NULL, 0, NULL
2609     };
2610     char stime_buf[SKTIMESTAMP_STRLEN];
2611     uint64_t sTime, eTime;
2612     uint32_t duration;
2613     sktime_t export_time;
2614     intmax_t uptime, difference;
2615     sktime_t export_msec;
2616     const char *rollover_first;
2617     const char *rollover_last = "";
2618     const ski_fixrec_t *fixrec;
2619     rwRec *rec;
2620 
2621     /* which of sTime(1), duration(2), and eTime(4) were found, and
2622      * whether export_time(8) was used for sTime or eTime */
2623     unsigned int time_fields;
2624 
2625 #define COMPUTE_DURATION(s_time, e_time)                        \
2626     (((e_time < s_time) || ((e_time - s_time) > UINT32_MAX))    \
2627      ? UINT32_MAX                                               \
2628      : (e_time - s_time))
2629 
2630 #define RECORD_SET_TIME_RETURN_NO_LOG(s_time, elapsed)                  \
2631     rwRecSetStartTime(rec, (sktime_t)s_time);                           \
2632     rwRecSetElapsed(rec, elapsed);                                      \
2633     if (!(skpcProbeGetLogFlags(probe) & SOURCE_LOG_TIMESTAMPS)) {       \
2634         return;                                                         \
2635     }
2636 
2637     fixrec = &record->data.fixrec;
2638     rec = record->fwd_rec;
2639 
2640     TRACEMSG(3, ("bmap & time_mask: " BMAP_PRI " => " BMAP_PRI,
2641                  record->bmap,
2642                  (BMAP_TYPE)(record->bmap & TMPL_MASK_GAUNTLET_OF_TIME)));
2643 
2644     switch (record->bmap & TMPL_MASK_GAUNTLET_OF_TIME) {
2645       case (TMPL_BIT_flowStartMilliseconds | TMPL_BIT_flowEndMilliseconds):
2646         sTime = fixrec->flowStartMilliseconds;
2647         eTime = fixrec->flowEndMilliseconds;
2648         RECORD_SET_TIME_RETURN_NO_LOG(sTime, COMPUTE_DURATION(sTime, eTime));
2649         time_fields = 5;
2650         log_rec_time.start_val = fixrec->flowStartMilliseconds;
2651         log_rec_time.start_name = "flowStartMilliseconds";
2652         log_rec_time.end_val = fixrec->flowEndMilliseconds;
2653         log_rec_time.end_name = "flowEndMilliseconds";
2654         break;
2655 
2656       case (TMPL_BIT_flowStartSeconds | TMPL_BIT_flowEndSeconds):
2657         sTime = sktimeCreate(fixrec->flowStartSeconds, 0);
2658         eTime = sktimeCreate(fixrec->flowEndSeconds, 0);
2659         RECORD_SET_TIME_RETURN_NO_LOG(sTime, COMPUTE_DURATION(sTime, eTime));
2660         time_fields = 5;
2661         log_rec_time.start_val = fixrec->flowStartSeconds;
2662         log_rec_time.start_name = "flowStartSeconds";
2663         log_rec_time.end_val = fixrec->flowEndSeconds;
2664         log_rec_time.end_name = "flowEndSeconds";
2665         break;
2666 
2667       case (TMPL_BIT_flowStartMicroseconds | TMPL_BIT_flowEndMicroseconds):
2668         sTime = skiNTPDecode(fixrec->flowStartMicroseconds, 1);
2669         eTime = skiNTPDecode(fixrec->flowEndMicroseconds, 1);
2670         RECORD_SET_TIME_RETURN_NO_LOG(sTime, COMPUTE_DURATION(sTime, eTime));
2671         time_fields = 5;
2672         log_rec_time.start_val = fixrec->flowStartMicroseconds;
2673         log_rec_time.start_name = "flowStartMicroseconds";
2674         log_rec_time.end_val = fixrec->flowEndMicroseconds;
2675         log_rec_time.end_name = "flowEndMicroseconds";
2676         break;
2677 
2678       case (TMPL_BIT_flowStartNanoseconds | TMPL_BIT_flowEndNanoseconds):
2679         sTime = skiNTPDecode(fixrec->flowStartNanoseconds, 0);
2680         eTime = skiNTPDecode(fixrec->flowEndNanoseconds, 0);
2681         RECORD_SET_TIME_RETURN_NO_LOG(sTime, COMPUTE_DURATION(sTime, eTime));
2682         time_fields = 5;
2683         log_rec_time.start_val = fixrec->flowStartNanoseconds;
2684         log_rec_time.start_name = "flowStartNanoseconds";
2685         log_rec_time.end_val = fixrec->flowEndNanoseconds;
2686         log_rec_time.end_name = "flowEndNanoseconds";
2687         break;
2688 
2689       case (TMPL_BIT_flowStartDeltaMicroseconds
2690             | TMPL_BIT_flowEndDeltaMicroseconds):
2691         export_time = sktimeCreate(fBufGetExportTime(fbuf), 0);
2692         sTime = (export_time - fixrec->flowStartDeltaMicroseconds / 1000);
2693         eTime = (export_time - fixrec->flowEndDeltaMicroseconds / 1000);
2694         RECORD_SET_TIME_RETURN_NO_LOG(sTime, COMPUTE_DURATION(sTime, eTime));
2695         time_fields = 13;
2696         log_rec_time.start_val = fixrec->flowStartDeltaMicroseconds;
2697         log_rec_time.start_name = "flowStartDeltaMicroseconds";
2698         log_rec_time.end_val = fixrec->flowEndDeltaMicroseconds;
2699         log_rec_time.end_name = "flowEndDeltaMicroseconds";
2700         break;
2701 
2702       case (TMPL_BIT_flowStartMilliseconds| TMPL_BIT_flowDurationMilliseconds):
2703         sTime = fixrec->flowStartMilliseconds;
2704         duration = fixrec->flowDurationMilliseconds;
2705         RECORD_SET_TIME_RETURN_NO_LOG(sTime, duration);
2706         time_fields = 3;
2707         log_rec_time.start_val = fixrec->flowStartMilliseconds;
2708         log_rec_time.start_name = "flowStartMilliseconds";
2709         log_rec_time.dur_val = fixrec->flowDurationMilliseconds;
2710         log_rec_time.dur_name = "flowDurationMilliseconds";
2711         break;
2712 
2713       case (TMPL_BIT_flowStartMicroseconds| TMPL_BIT_flowDurationMicroseconds):
2714         sTime = skiNTPDecode(fixrec->flowStartMicroseconds, 1);
2715         duration = fixrec->flowDurationMicroseconds / 1000;
2716         RECORD_SET_TIME_RETURN_NO_LOG(sTime, duration);
2717         time_fields = 3;
2718         log_rec_time.start_val = fixrec->flowStartMicroseconds;
2719         log_rec_time.start_name = "flowStartMicroseconds";
2720         log_rec_time.dur_val = fixrec->flowDurationMicroseconds;
2721         log_rec_time.dur_name = "flowDurationMicroseconds";
2722         break;
2723 
2724       case 0:
2725         /* no times, set start to export time and set dur to 0 */
2726         sTime = sktimeCreate(fBufGetExportTime(fbuf), 0);
2727         RECORD_SET_TIME_RETURN_NO_LOG(sTime, 0);
2728         time_fields = 0;
2729         break;
2730 
2731       case (TMPL_BIT_flowStartSysUpTime | TMPL_BIT_systemInitTimeMilliseconds):
2732         /* Times based on flow generator system uptimes (Netflow v9) */
2733 
2734         /* Compute the uptime: systemInitTimeMilliseconds is the
2735          * absolute router boot time (msec), and libfixbuf sets it by
2736          * subtracting the NFv9 uptime (msec) from the record's
2737          * absolute export time (sec). */
2738         export_msec = sktimeCreate(fBufGetExportTime(fbuf), 0);
2739         uptime = export_msec - fixrec->systemInitTimeMilliseconds;
2740         if (skpcProbeGetQuirks(probe) & SKPC_QUIRK_NF9_SYSUPTIME_SECS) {
2741             /* uptime was reported in seconds, not msec */
2742             TRACEMSG(3, (("Before adjustment: exportTimeMillisec %" PRIu64
2743                           ", initTimeMillisec %" PRIu64 ", upTime %" PRIdMAX
2744                           ", startUpTime %" PRIu32 ", endUpTime %" PRIu32
2745                           ", packets %" PRIu32),
2746                          export_msec, fixrec->systemInitTimeMilliseconds,
2747                          uptime, fixrec->flowStartSysUpTime,
2748                          fixrec->flowEndSysUpTime, rwRecGetPkts(rec)));
2749             uptime *= 1000;
2750             record->data.fixrec.systemInitTimeMilliseconds
2751                 = export_msec - uptime;
2752             if (rwRecGetPkts(rec) == 1
2753                 && (fixrec->flowEndSysUpTime < fixrec->flowStartSysUpTime))
2754             {
2755                 /* sometimes the end time for single packet flows is
2756                  * very different than the start time. */
2757                 record->data.fixrec.flowEndSysUpTime
2758                     = record->data.fixrec.flowStartSysUpTime;
2759             }
2760         }
2761 
2762         /* Set the duration. */
2763         if (fixrec->flowStartSysUpTime <= fixrec->flowEndSysUpTime) {
2764             rwRecSetElapsed(rec, (fixrec->flowEndSysUpTime
2765                                   - fixrec->flowStartSysUpTime));
2766         } else {
2767             /* assume EndTime rolled-over and start did not */
2768             rwRecSetElapsed(rec, (ROLLOVER32 + fixrec->flowEndSysUpTime
2769                                   - fixrec->flowStartSysUpTime));
2770             rollover_last = ", assume flowEndSysUpTime rollover";
2771         }
2772         /* Set start time. */
2773         difference = uptime - fixrec->flowStartSysUpTime;
2774         if (difference > MAXIMUM_FLOW_TIME_DEVIATION) {
2775             /* assume upTime is set before record is composed and
2776              * that start-time has rolled over. */
2777             rwRecSetStartTime(rec, (fixrec->systemInitTimeMilliseconds
2778                                     + fixrec->flowStartSysUpTime
2779                                     + ROLLOVER32));
2780             rollover_first = ", assume flowStartSysUpTime rollover";
2781         } else if (-difference > MAXIMUM_FLOW_TIME_DEVIATION) {
2782             /* assume upTime is set after record is composed and
2783              * that upTime has rolled over. */
2784             rwRecSetStartTime(rec, (fixrec->systemInitTimeMilliseconds
2785                                     + fixrec->flowStartSysUpTime
2786                                     - ROLLOVER32));
2787             rollover_first = ", assume sysUpTime rollover";
2788         } else {
2789             /* times look reasonable; assume no roll over */
2790             rwRecSetStartTime(rec, (fixrec->systemInitTimeMilliseconds
2791                                     + fixrec->flowStartSysUpTime));
2792             rollover_first = "";
2793         }
2794         if (skpcProbeGetLogFlags(probe) & SOURCE_LOG_TIMESTAMPS) {
2795             sktimestamp_r(stime_buf, rwRecGetStartTime(rec), SKTIMESTAMP_UTC);
2796             INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
2797                      " flowStartSysUpTime=%" PRIu32
2798                      ", flowEndSysUpTime=%" PRIu32
2799                      ", systemInitTimeMilliseconds=%" PRIu64
2800                      ", exportTimeSeconds=%" PRIu32
2801                      ", calculated sysUpTime=%" PRIdMAX "%s%s"),
2802                     skpcProbeGetName(probe),
2803                     stime_buf, (double)rwRecGetElapsed(rec)/1000,
2804                     fixrec->flowStartSysUpTime, fixrec->flowEndSysUpTime,
2805                     fixrec->systemInitTimeMilliseconds,
2806                     fBufGetExportTime(fbuf), uptime,
2807                     rollover_first, rollover_last);
2808         }
2809         return;
2810 
2811       case (TMPL_BIT_flowStartSysUpTime):
2812         /* Times based on flow generator system uptimes (Netflow v9),
2813          * but there is no system init time and we do not know when
2814          * the router booted.  assume end-time is same as the record's
2815          * export time and set start-time accordingly. */
2816         if (fixrec->flowStartSysUpTime <= fixrec->flowEndSysUpTime) {
2817             rwRecSetElapsed(rec, (fixrec->flowEndSysUpTime
2818                                   - fixrec->flowStartSysUpTime));
2819         } else {
2820             /* assume EndTime rolled-over and start did not */
2821             rwRecSetElapsed(rec, (ROLLOVER32 + fixrec->flowEndSysUpTime
2822                                   - fixrec->flowStartSysUpTime));
2823             rollover_last = ", assume flowEndSysUpTime rollover";
2824         }
2825         /* Set start time. */
2826         export_msec = sktimeCreate(fBufGetExportTime(fbuf), 0);
2827         rwRecSetStartTime(rec, export_msec - rwRecGetElapsed(rec));
2828         if (skpcProbeGetLogFlags(probe) & SOURCE_LOG_TIMESTAMPS) {
2829             sktimestamp_r(stime_buf, rwRecGetStartTime(rec), SKTIMESTAMP_UTC);
2830             INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
2831                      " flowStartSysUpTime=%" PRIu32
2832                      ", flowEndSysUpTime=%" PRIu32
2833                      ", no systemInitTimeMilliseconds"
2834                      ", set end to exportTimeSeconds=%" PRIu32 "%s"),
2835                     skpcProbeGetName(probe),
2836                     stime_buf, (double)rwRecGetElapsed(rec)/1000,
2837                     fixrec->flowStartSysUpTime, fixrec->flowEndSysUpTime,
2838                     fBufGetExportTime(fbuf), rollover_last);
2839         }
2840         return;
2841 
2842       default:
2843         time_fields = 0;
2844         /* look for all possible start times */
2845         if (record->bmap & TMPL_BIT_flowStartMilliseconds) {
2846             time_fields |= 1;
2847             sTime = fixrec->flowStartMilliseconds;
2848             log_rec_time.start_val = fixrec->flowStartMilliseconds;
2849             log_rec_time.start_name = "flowStartMilliseconds";
2850         } else if (record->bmap & TMPL_BIT_flowStartSeconds) {
2851             time_fields |= 1;
2852             sTime = UINT64_C(1000) * (uint64_t)fixrec->flowStartSeconds;
2853             log_rec_time.start_val = fixrec->flowStartSeconds;
2854             log_rec_time.start_name = "flowStartSeconds";
2855         } else if (record->bmap & TMPL_BIT_flowStartMicroseconds) {
2856             time_fields |= 1;
2857             sTime = skiNTPDecode(fixrec->flowStartMicroseconds, 1);
2858             log_rec_time.start_val = fixrec->flowStartMicroseconds;
2859             log_rec_time.start_name = "flowStartMicroseconds";
2860         } else if (record->bmap & TMPL_BIT_flowStartNanoseconds) {
2861             time_fields |= 1;
2862             sTime = skiNTPDecode(fixrec->flowStartNanoseconds, 0);
2863             log_rec_time.start_val = fixrec->flowStartNanoseconds;
2864             log_rec_time.start_name = "flowStartNanoseconds";
2865         } else if (record->bmap & TMPL_BIT_flowStartDeltaMicroseconds) {
2866             time_fields |= 9;
2867             sTime = (fBufGetExportTime(fbuf) * 1000
2868                      - fixrec->flowStartDeltaMicroseconds / 1000);
2869             log_rec_time.start_val = fixrec->flowStartDeltaMicroseconds;
2870             log_rec_time.start_name = "flowStartDeltaMicroseconds";
2871         } else if (record->bmap & TMPL_BIT_flowStartSysUpTime) {
2872             /* must be additional time fields present that prevented
2873              * the switch from matching; mask those out and call this
2874              * function again */
2875             BMAP_TYPE bmap_orig = record->bmap;
2876             record->bmap = bmap_orig & (TMPL_BIT_flowStartSysUpTime
2877                                         | TMPL_BIT_systemInitTimeMilliseconds);
2878             TRACEMSG(1, ("Unusual time fields present on record"));
2879             skiGauntletOfTime(fbuf, record, probe);
2880             record->bmap = bmap_orig;
2881             return;
2882         } else {
2883             sTime = 0;
2884         }
2885 
2886         /* look for all possible end times; if none found look for
2887          * collection/observation times */
2888         if (record->bmap & TMPL_BIT_flowEndMilliseconds) {
2889             time_fields |= 4;
2890             eTime = fixrec->flowEndMilliseconds;
2891             log_rec_time.end_val = fixrec->flowEndMilliseconds;
2892             log_rec_time.end_name = "flowEndMilliseconds";
2893         } else if (record->bmap & TMPL_BIT_flowEndSeconds) {
2894             time_fields |= 4;
2895             eTime = UINT64_C(1000) * (uint64_t)fixrec->flowEndSeconds;
2896             log_rec_time.end_val = fixrec->flowEndSeconds;
2897             log_rec_time.end_name = "flowEndSeconds";
2898         } else if (record->bmap & TMPL_BIT_flowEndMicroseconds) {
2899             time_fields |= 4;
2900             eTime = skiNTPDecode(fixrec->flowEndMicroseconds, 1);
2901             log_rec_time.end_val = fixrec->flowEndMicroseconds;
2902             log_rec_time.end_name = "flowEndMicroseconds";
2903         } else if (record->bmap & TMPL_BIT_flowEndNanoseconds) {
2904             time_fields |= 4;
2905             eTime = skiNTPDecode(fixrec->flowEndNanoseconds, 0);
2906             log_rec_time.end_val = fixrec->flowEndNanoseconds;
2907             log_rec_time.end_name = "flowEndNanoseconds";
2908         } else if (record->bmap & TMPL_BIT_flowEndDeltaMicroseconds) {
2909             time_fields |= 12;
2910             eTime = (fBufGetExportTime(fbuf) * 1000
2911                      - fixrec->flowEndDeltaMicroseconds / 1000);
2912             log_rec_time.end_val = fixrec->flowEndDeltaMicroseconds;
2913             log_rec_time.end_name = "flowEndDeltaMicroseconds";
2914         } else if (record->bmap & TMPL_BIT_collectionTimeMilliseconds) {
2915             time_fields |= 4;
2916             eTime = fixrec->collectionTimeMilliseconds;
2917             log_rec_time.end_val = fixrec->collectionTimeMilliseconds;
2918             log_rec_time.end_name = "collectionTimeMilliseconds";
2919         } else if (record->bmap & TMPL_BIT_observationTimeMilliseconds) {
2920             time_fields |= 4;
2921             eTime = fixrec->observationTimeMilliseconds;
2922             log_rec_time.end_val = fixrec->observationTimeMilliseconds;
2923             log_rec_time.end_name = "observationTimeMilliseconds";
2924         } else if (record->bmap & TMPL_BIT_observationTimeSeconds) {
2925             time_fields |= 4;
2926             eTime = UINT64_C(1000) * (uint64_t)fixrec->observationTimeSeconds;
2927             log_rec_time.end_val = fixrec->observationTimeSeconds;
2928             log_rec_time.end_name = "observationTimeSeconds";
2929         } else if (record->bmap & TMPL_BIT_observationTimeMicroseconds) {
2930             time_fields |= 4;
2931             eTime = skiNTPDecode(fixrec->observationTimeMicroseconds, 1);
2932             log_rec_time.end_val = fixrec->observationTimeMicroseconds;
2933             log_rec_time.end_name = "observationTimeMicroseconds";
2934         } else if (record->bmap & TMPL_BIT_observationTimeNanoseconds) {
2935             time_fields |= 4;
2936             eTime = skiNTPDecode(fixrec->observationTimeNanoseconds, 0);
2937             log_rec_time.end_val = fixrec->observationTimeNanoseconds;
2938             log_rec_time.end_name = "observationTimeNanoseconds";
2939         } else {
2940             eTime = 0;
2941         }
2942 
2943         /* look for durations */
2944         if (record->bmap & TMPL_BIT_flowDurationMilliseconds) {
2945             time_fields |= 2;
2946             duration = fixrec->flowDurationMilliseconds;
2947             log_rec_time.dur_val = fixrec->flowDurationMilliseconds;
2948             log_rec_time.dur_name = "flowDurationMilliseconds";
2949         } else if (record->bmap & TMPL_BIT_flowDurationMicroseconds) {
2950             time_fields |= 2;
2951             duration = fixrec->flowDurationMicroseconds / 1000;
2952             log_rec_time.dur_val = fixrec->flowDurationMicroseconds;
2953             log_rec_time.dur_name = "flowDurationMicroseconds";
2954         } else {
2955             duration = 0;
2956         }
2957 
2958         /* set the time fields on the record */
2959         switch (time_fields) {
2960           case 3: case 7: case 11: case 15:
2961             /* have start and duration; use them; ignore eTime if
2962              * present */
2963             RECORD_SET_TIME_RETURN_NO_LOG(sTime, duration);
2964             break;
2965 
2966           case 5: case 13:
2967             /* have start and end; use them */
2968             RECORD_SET_TIME_RETURN_NO_LOG(
2969                 sTime, COMPUTE_DURATION(sTime, eTime));
2970             break;
2971 
2972           case 1: case 9:
2973             /* only have a start time; use it and set dur to 0 */
2974             RECORD_SET_TIME_RETURN_NO_LOG(sTime, 0);
2975             break;
2976 
2977           case 6: case 14:
2978             /* have dur and end time; compute start */
2979             sTime = (sktime_t)(eTime - duration);
2980             RECORD_SET_TIME_RETURN_NO_LOG(sTime, duration);
2981             break;
2982 
2983           case 2:
2984             /* only have a duration; use export time as end time */
2985             sTime = sktimeCreate(fBufGetExportTime(fbuf),0) - duration;
2986             RECORD_SET_TIME_RETURN_NO_LOG(sTime, duration);
2987             break;
2988 
2989           case 4: case 12:
2990             /* only have an end time; use it as start time and set dur
2991              * to 0 */
2992             RECORD_SET_TIME_RETURN_NO_LOG(eTime, 0);
2993             sTime = eTime;
2994             break;
2995 
2996           case 0:
2997             /* no times, set start to export time and set dur to 0 */
2998             sTime = sktimeCreate(fBufGetExportTime(fbuf), 0);
2999             RECORD_SET_TIME_RETURN_NO_LOG(sTime, 0);
3000             break;
3001 
3002           case 8: case 10:
3003           default:
3004             skAbortBadCase(time_fields);
3005         }
3006         /* close the default: clause */
3007         break;
3008     }
3009 
3010     /* Should only get here when logging was requested */
3011     assert(skpcProbeGetLogFlags(probe) & SOURCE_LOG_TIMESTAMPS);
3012     sktimestamp_r(stime_buf, sTime, SKTIMESTAMP_UTC);
3013     switch (time_fields) {
3014       case 3:
3015       case 7:
3016         /* stime, duration, and maybe etime (which is ignored) */
3017         INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
3018                  " %s=%" PRIu64 ", %s=%" PRIu64),
3019                 skpcProbeGetName(probe), stime_buf,
3020                 (double)rwRecGetElapsed(rec)/1000,
3021                 log_rec_time.start_name,log_rec_time.start_val,
3022                 log_rec_time.dur_name, log_rec_time.dur_val);
3023         break;
3024 
3025       case 11:
3026       case 15:
3027         /* as above, with export time */
3028         INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
3029                  " %s=%" PRIu64 ", %s=%" PRIu64 ", exportTimeSeconds=%" PRIu32),
3030                 skpcProbeGetName(probe), stime_buf,
3031                 (double)rwRecGetElapsed(rec)/1000,
3032                 log_rec_time.start_name,log_rec_time.start_val,
3033                 log_rec_time.dur_name, log_rec_time.dur_val,
3034                 fBufGetExportTime(fbuf));
3035         break;
3036 
3037       case 5:
3038         /* stime and etime */
3039         INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
3040                  " %s=%" PRIu64 ", %s=%" PRIu64),
3041                 skpcProbeGetName(probe), stime_buf,
3042                 (double)rwRecGetElapsed(rec)/1000,
3043                 log_rec_time.start_name,log_rec_time.start_val,
3044                 log_rec_time.end_name, log_rec_time.end_val);
3045         break;
3046 
3047       case 13:
3048         /* stime and etime, with export time */
3049         INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
3050                  " %s=%" PRIu64 ", %s=%" PRIu64 ", exportTimeSeconds=%" PRIu32),
3051                 skpcProbeGetName(probe), stime_buf,
3052                 (double)rwRecGetElapsed(rec)/1000,
3053                 log_rec_time.start_name,log_rec_time.start_val,
3054                 log_rec_time.end_name, log_rec_time.end_val,
3055                 fBufGetExportTime(fbuf));
3056         break;
3057 
3058       case 1:
3059         /* stime only */
3060         INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
3061                  " %s=%" PRIu64),
3062                 skpcProbeGetName(probe), stime_buf, 0.0,
3063                 log_rec_time.start_name,log_rec_time.start_val);
3064         break;
3065 
3066       case 9:
3067         /* stime only with an export time */
3068         INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
3069                  " %s=%" PRIu64 ", exportTimeSeconds=%" PRIu32),
3070                 skpcProbeGetName(probe), stime_buf, 0.0,
3071                 log_rec_time.start_name,log_rec_time.start_val,
3072                 fBufGetExportTime(fbuf));
3073         break;
3074 
3075       case 6:
3076         /* duration and end time */
3077         INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
3078                  " %s=%" PRIu64 ", %s=%" PRIu64),
3079                 skpcProbeGetName(probe), stime_buf,
3080                 (double)rwRecGetElapsed(rec)/1000,
3081                 log_rec_time.dur_name, log_rec_time.dur_val,
3082                 log_rec_time.end_name, log_rec_time.end_val);
3083         break;
3084 
3085       case 14:
3086         /* duration and end time, with an export time */
3087         INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
3088                  " %s=%" PRIu64 ", %s=%" PRIu64 ", exportTimeSeconds=%" PRIu32),
3089                 skpcProbeGetName(probe), stime_buf,
3090                 (double)rwRecGetElapsed(rec)/1000,
3091                 log_rec_time.dur_name, log_rec_time.dur_val,
3092                 log_rec_time.end_name, log_rec_time.end_val,
3093                 fBufGetExportTime(fbuf));
3094         break;
3095 
3096       case 2:
3097         /* duration only */
3098         INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
3099                  " %s=%" PRIu64 ", set end to exportTimeSeconds=%" PRIu32),
3100                 skpcProbeGetName(probe), stime_buf,
3101                 (double)rwRecGetElapsed(rec)/1000,
3102                 log_rec_time.dur_name, log_rec_time.dur_val,
3103                 fBufGetExportTime(fbuf));
3104         break;
3105 
3106       case 4:
3107         /* end time only */
3108         INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
3109                  " %s=%" PRIu64),
3110                 skpcProbeGetName(probe), stime_buf, 0.0,
3111                 log_rec_time.end_name, log_rec_time.end_val);
3112         break;
3113 
3114       case 12:
3115         /* end time only, with export time */
3116         INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
3117                  " %s=%" PRIu64 ", exportTimeSeconds=%" PRIu32),
3118                 skpcProbeGetName(probe), stime_buf, 0.0,
3119                 log_rec_time.end_name, log_rec_time.end_val,
3120                 fBufGetExportTime(fbuf));
3121         break;
3122 
3123       case 0:
3124         /* no times */
3125         INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs based on"
3126                  " exportTimeSeconds=%" PRIu32),
3127                 skpcProbeGetName(probe), stime_buf, 0.0,
3128                 fBufGetExportTime(fbuf));
3129         break;
3130 
3131       case 8:
3132       case 10:
3133       default:
3134         skAbortBadCase(time_fields);
3135     }
3136 }
3137 
3138 
3139 /**
3140  *    Print a log message saying why a ski_fixrec_t record was ignored
3141  */
3142 static void
ski_fixrec_ignore(const ski_fixrec_t * fixrec,const char * reason)3143 ski_fixrec_ignore(
3144     const ski_fixrec_t *fixrec,
3145     const char         *reason)
3146 {
3147     skipaddr_t ipaddr;
3148     char sipbuf[64];
3149     char dipbuf[64];
3150 
3151     if (!SK_IPV6_IS_ZERO(fixrec->sourceIPv6Address)) {
3152 #ifdef SK_HAVE_INET_NTOP
3153         if (!inet_ntop(AF_INET6, &fixrec->sourceIPv6Address,
3154                        sipbuf, sizeof(sipbuf)))
3155 #endif
3156         {
3157             strcpy(sipbuf, "unknown-v6");
3158         }
3159     } else {
3160         skipaddrSetV4(&ipaddr, &fixrec->sourceIPv4Address);
3161         skipaddrString(sipbuf, &ipaddr, SKIPADDR_CANONICAL);
3162     }
3163     if (!SK_IPV6_IS_ZERO(fixrec->destinationIPv6Address)) {
3164 #ifdef SK_HAVE_INET_NTOP
3165         if (!inet_ntop(AF_INET6, &fixrec->destinationIPv6Address,
3166                        dipbuf, sizeof(dipbuf)))
3167 #endif
3168         {
3169             strcpy(dipbuf, "unknown-v6");
3170         }
3171     } else {
3172         skipaddrSetV4(&ipaddr, &fixrec->destinationIPv4Address);
3173         skipaddrString(dipbuf, &ipaddr, SKIPADDR_CANONICAL);
3174     }
3175 
3176     INFOMSG(("IGNORED|%s|%s|%u|%u|%u|%" PRIu64 "|%" PRIu64 "|%s|"),
3177             sipbuf, dipbuf, fixrec->sourceTransportPort,
3178             fixrec->destinationTransportPort,fixrec->protocolIdentifier,
3179             ((fixrec->packetDeltaCount)
3180              ? fixrec->packetDeltaCount
3181              : ((fixrec->packetTotalCount)
3182                 ? fixrec->packetTotalCount
3183                 : fixrec->initiatorPackets)),
3184             ((fixrec->octetDeltaCount)
3185              ? fixrec->octetDeltaCount
3186              : ((fixrec->octetTotalCount)
3187                 ? fixrec->octetTotalCount
3188                 : fixrec->initiatorOctets)),
3189             reason);
3190 }
3191 
3192 
3193 /**
3194  *    Call fBufNext() and transcode the data into the ski_fixrec_spec
3195  *    template, then convert the structure into 0, 1, or 2 SiLK Flow
3196  *    records and fill the record pointers on the 'record' structure.
3197  *    The return value indicates the number of records converted.
3198  *    Return -1 on failure.
3199  *
3200  *    The reverse record is cleared via RWREC_CLEAR() when the return
3201  *    value is 1.
3202  *
3203  *    Return 0 if the IPFIX record should be ignored.  The forward rec
3204  *    will have been cleared; the reverse record is untouched.  A
3205  *    record can be ignored when (1)the record is IPv6 and SiLK is
3206  *    compiled without IPv6 support, (2)the record has a packet and/or
3207  *    byte count of 0, or (3)the record is explicitly marked as an
3208  *    "intermediate" record by yaf.
3209  *
3210  *    Return -1 on failure.  The forward rec will have been cleared;
3211  *    the reverse record is untouched.
3212  */
3213 static int
ski_fixrec_next(fBuf_t * fbuf,ski_record_t * record,const skpc_probe_t * probe,GError ** err)3214 ski_fixrec_next(
3215     fBuf_t                 *fbuf,
3216     ski_record_t           *record,
3217     const skpc_probe_t     *probe,
3218     GError                **err)
3219 {
3220     fbSubTemplateMultiListEntry_t *stml;
3221     ski_fixrec_t *fixrec;
3222     size_t len;
3223     uint64_t pkts, bytes;
3224     uint64_t rev_pkts, rev_bytes;
3225     uint8_t tcp_state;
3226     uint8_t tcp_flags;
3227     int have_tcp_stml = 0;
3228     rwRec *fwd_rec;
3229 
3230     DEFINE_PREFIX_BUF(prefix);
3231     makeTracemsgPrefix(prefix, sizeof(prefix), skpcProbeGetName(probe),
3232                        fbSessionGetDomain(fBufGetSession(fbuf)),
3233                        record->tid, record->tmpl);
3234     TRACEMSG(2, (("%s bmap " BMAP_PRI ", read by ski_fixrec_next()"),
3235                  prefix, record->bmap));
3236     assert(SKI_RECTYPE_FIXREC == record->rectype);
3237 
3238     /* Get a local handle to the record and clear it */
3239     fwd_rec = record->fwd_rec;
3240     RWREC_CLEAR(fwd_rec);
3241 
3242     /* Set internal template to read an extended flow record */
3243     if (!fBufSetInternalTemplate(fbuf, SKI_FIXREC_TID, err)) {
3244         return -1;
3245     }
3246 
3247     /* Get the next record */
3248     len = sizeof(record->data.fixrec);
3249     if (!fBufNext(fbuf, (uint8_t *)&record->data.fixrec, &len, err)) {
3250         return -1;
3251     }
3252     assert(len == sizeof(ski_fixrec_t));
3253     fixrec = &record->data.fixrec;
3254 
3255     if ((fixrec->flowEndReason & SKI_END_MASK)== SKI_END_YAF_INTERMEDIATE_FLOW)
3256     {
3257         TRACEMSG(2, ("Ignored YAF intermediate uniflow"));
3258         return 0;
3259     }
3260 
3261     /* Ignore records with no IPs.  Ignore records that do not have
3262      * IPv4 addresses when SiLK was built without IPv6 support. */
3263     if (record->bmap & TMPL_BIT_sourceIPv4Address) {
3264         /* we're good */
3265     } else if (record->bmap & TMPL_BIT_sourceIPv6Address) {
3266 #if !SK_ENABLE_IPV6
3267         ski_fixrec_ignore(fixrec, "IPv6 record");
3268         return 0;
3269 #endif  /* SK_ENABLE_IPV6 */
3270     } else if ((skpcProbeGetQuirks(probe) & SKPC_QUIRK_MISSING_IPS) == 0) {
3271         ski_fixrec_ignore(fixrec, "No IP addresses");
3272         return 0;
3273     }
3274 
3275     if (skpcProbeGetQuirks(probe) & SKPC_QUIRK_NF9_OUT_IS_REVERSE) {
3276         TRACEMSG(
3277             2, (("Setting reverse Octet/Packet counts (currently"
3278                  " %" PRIu64 "/%" PRIu64 ") to post Octet/Packet counts"
3279                  " (%" PRIu64 "/%" PRIu64 ") due to nf9-out-is-reverse"),
3280                 fixrec->reverseOctetDeltaCount,fixrec->reversePacketDeltaCount,
3281                 fixrec->postOctetDeltaCount, fixrec->postPacketDeltaCount));
3282         fixrec->reverseOctetDeltaCount = fixrec->postOctetDeltaCount;
3283         fixrec->reversePacketDeltaCount = fixrec->postPacketDeltaCount;
3284         fixrec->postOctetDeltaCount = fixrec->postPacketDeltaCount = 0;
3285     }
3286 
3287     /* Get the forward and reverse packet and byte counts (run the
3288      * Gauntlet of Volume). */
3289     pkts = ((fixrec->packetDeltaCount)
3290             ? fixrec->packetDeltaCount
3291             : ((fixrec->packetTotalCount)
3292                ? fixrec->packetTotalCount
3293                : ((fixrec->initiatorPackets)
3294                   ? fixrec->initiatorPackets
3295                   : ((fixrec->postPacketDeltaCount)
3296                      ? fixrec->postPacketDeltaCount
3297                      : fixrec->postPacketTotalCount))));
3298     bytes = ((fixrec->octetDeltaCount)
3299              ? fixrec->octetDeltaCount
3300              : ((fixrec->octetTotalCount)
3301                 ? fixrec->octetTotalCount
3302                 : ((fixrec->initiatorOctets)
3303                    ? fixrec->initiatorOctets
3304                    : ((fixrec->postOctetDeltaCount)
3305                       ? fixrec->postOctetDeltaCount
3306                       : fixrec->postOctetTotalCount))));
3307 
3308     /* I suppose we could add checks for
3309      * reversePost{Packet,Octet}{Delta,Total}Count here as well. */
3310     rev_pkts = ((fixrec->reversePacketDeltaCount)
3311                 ? fixrec->reversePacketDeltaCount
3312                 : ((fixrec->reversePacketTotalCount)
3313                    ? fixrec->reversePacketTotalCount
3314                    : fixrec->responderPackets));
3315     rev_bytes = ((fixrec->reverseOctetDeltaCount)
3316                  ? fixrec->reverseOctetDeltaCount
3317                  : ((fixrec->reverseOctetTotalCount)
3318                     ? fixrec->reverseOctetTotalCount
3319                     : fixrec->responderOctets));
3320 
3321     /*
3322      *  Handle records that represent a "firewall event" when the
3323      *  SKPC_QUIRK_FW_EVENT quirks value is set on the probe.  When
3324      *  the quirk is not set, process the records normally.
3325      *
3326      *  This code changed in SiLK 3.8.0.  Prior to SiLK 3.8.0, all
3327      *  firewall event status messages were dropped.
3328      *
3329      *  It seems that every record from a Cisco ASA has
3330      *  <strike>NF_F_FW_EVENT</strike> and NF_F_FW_EXT_EVENT
3331      *  information elements, so ignoring flow records with these
3332      *  elements means ignoring all flow records.
3333      *
3334      *  It now (2015-June) seems that the NF_F_FW_EVENT information
3335      *  element mentioned in the previous paragraph has been replaced
3336      *  with firewallEvent (IE 233).
3337      *
3338      *  firewallEvent is an official IPFIX information element, IE 233
3339      *
3340      *  NF_F_FW_EVENT is Cisco IE 40005
3341      *
3342      *  NF_F_FW_EXT_EVENT is Cisco IE 33002.
3343      *
3344      *  Note that the Cisco IE numbers cannot be used in IPFIX because
3345      *  IPFIX would treat them as "reverse" records.
3346      *
3347      *  References (October 2013):
3348      *  http://www.cisco.com/en/US/docs/security/asa/asa82/netflow/netflow.html#wp1028202
3349      *  http://www.cisco.com/en/US/docs/security/asa/asa84/system/netflow/netflow.pdf
3350      *
3351      *  Values for the NF_F_FW_EXT_EVENT depend on the values for the
3352      *  firewallEvent or NF_F_FW_EVENT.  The following lists the
3353      *  FW_EVENT with sub-bullets for the NF_F_FW_EXT_EVENT.
3354      *
3355      *  0.  Ignore -- This value indicates that a field must be
3356      *      ignored.
3357      *
3358      *      0.  Ignore -- This value indicates that the field must be
3359      *          ignored.
3360      *
3361      *  1.  Flow created -- This value indicates that a new flow was
3362      *      created.
3363      *
3364      *  2.  Flow deleted -- This value indicates that a flow was
3365      *      deleted.
3366      *
3367      *    >2000.  Values above 2000 represent various reasons why a
3368      *            flow was terminated.
3369      *
3370      *  3.  Flow denied -- This value indicates that a flow was
3371      *      denied.
3372      *
3373      *    >1000.  Values above 1000 represent various reasons why a
3374      *            flow was denied.
3375      *
3376      *     1001.  A flow was denied by an ingress ACL.
3377      *
3378      *     1002.  A flow was denied by an egress ACL.
3379      *
3380      *     1003.  The ASA denied an attempt to connect to the (ASA's)
3381      *            interface service.
3382      *
3383      *     1004.  The flow was denied because the first packet on the
3384      *            TCP was not a TCP SYN packet.
3385      *
3386      *  5.  Flow updated -- This value indicates that a flow update
3387      *      timer went off or a flow was torn down.
3388      *
3389      *  The IPFIX values for the firewallEvent IE follow those for
3390      *  NF_F_FW_EVENT (with IPFIX providing no explanation as to what
3391      *  the values mean! --- some standard) and IPFIX adds the value:
3392      *
3393      *  4.  Flow alert.
3394      *
3395      *  PROCESSING RULES:
3396      *
3397      *  The term "ignore" below means that a log message is written
3398      *  and that no SiLK flow record is created.
3399      *
3400      *  Ignore flow records where the "flow ignore" event is present.
3401      *
3402      *  Treat records where "flow deleted" is specified as actual flow
3403      *  records to be processed and stored.
3404      *
3405      *  Ignore "flow created" events, since we will handle these flows
3406      *  when the "flow deleted" event occurs.  Also, a short-lived
3407      *  flow record may produce a "flow deleted" event without a "flow
3408      *  created" event.
3409      *
3410      *  For a "flow denied" event, write a special value into the SiLK
3411      *  Flow record that the writing thread can use to categorize the
3412      *  record as innull/outnull.
3413      *
3414      *  It is unclear how to handle "flow updated" events. If the
3415      *  record is only being updated, presumably SiLK will get a "flow
3416      *  deleted" event in the future.  However, if the flow is being
3417      *  torn down, will the ASA send a separate "flow deleted" event?
3418      *  For now (as of SiLK 3.8.0), ignore "flow updated" events.
3419      *
3420      *  Ignore "flow alert" events.
3421      *
3422      *
3423      *  Firewall events, byte and packet counts, and the Cisco ASA:
3424      *
3425      *  1.  Flow created events have a byte and packet count of 0;
3426      *  this is fine since we are ignoring these flows.
3427      *
3428      *  2.  Flow deinied events have a byte and packet count of 0.
3429      *  SiLK will ignore these flows unless we doctor them to have a
3430      *  non-zero byte and packet count, which we do when the ASA hack
3431      *  is enabled.
3432      *
3433      *  3.  Flow deleted events have a packet count of 0, but we have
3434      *  code below to work around that when the ASA hack is enabled.
3435      *  The flows usally have a non-zero byte count.  However, some
3436      *  flow records have a 0-byte count, and (July 2015) we have been
3437      *  told one source of these records are packets to an un-opened
3438      *  port.  Previouly these flows were ignored, but as of SiLK
3439      *  3.11.0 we doctor the records to have a byte count of 1.
3440      */
3441     if ((skpcProbeGetQuirks(probe) & SKPC_QUIRK_FW_EVENT)
3442         && (record->bmap & (TMPL_BIT_firewallEvent | TMPL_BIT_NF_F_FW_EVENT
3443                             | TMPL_BIT_NF_F_FW_EXT_EVENT)))
3444     {
3445         char msg[64];
3446         uint8_t event = (fixrec->firewallEvent
3447                          ? fixrec->firewallEvent : fixrec->NF_F_FW_EVENT);
3448         if (SKIPFIX_FW_EVENT_DELETED == event) {
3449             /* flow deleted */
3450             TRACEMSG(1,(("Processing flow deleted event as actual flow record;"
3451                          " firewallEvent=%u, NF_F_FW_EVENT=%u,"
3452                          " NF_F_FW_EXT_EVENT=%u"),
3453                       fixrec->firewallEvent, fixrec->NF_F_FW_EVENT,
3454                       fixrec->NF_F_FW_EXT_EVENT));
3455             /* these normally have a byte count, but not always */
3456             if (0 == bytes) {
3457                 if (0 == pkts) {
3458                     TRACEMSG(1, ("Setting forward bytes and packets to 1"
3459                                  " for deleted firewall event"));
3460                     bytes = 1;
3461                     pkts = 1;
3462                 } else {
3463                     TRACEMSG(1, ("Setting forward bytes equal to packets value"
3464                                  " for deleted firewall event"));
3465                     bytes = pkts;
3466                 }
3467             } else {
3468                 /* there is a forward byte count */
3469                 if (0 == pkts) {
3470                     TRACEMSG(1, ("Setting forward packets to 1"));
3471                     pkts = 1;
3472                 }
3473                 if (rev_bytes) {
3474                     /* there is a reverse byte count */
3475                     if (0 == rev_pkts) {
3476                         TRACEMSG(1, ("Setting reverse packets to 1"));
3477                         rev_pkts = 1;
3478                     }
3479                 }
3480             }
3481 
3482         } else if (SKIPFIX_FW_EVENT_DENIED == event) {
3483             /* flow denied */
3484             TRACEMSG(1, (("Processing flow denied event as actual flow record;"
3485                           " firewallEvent=%u, NF_F_FW_EVENT=%u,"
3486                           " NF_F_FW_EXT_EVENT=%u"),
3487                          fixrec->firewallEvent, fixrec->NF_F_FW_EVENT,
3488                          fixrec->NF_F_FW_EXT_EVENT));
3489             if (SKIPFIX_FW_EVENT_DENIED_CHECK_VALID(fixrec->NF_F_FW_EXT_EVENT))
3490             {
3491                 rwRecSetMemo(fwd_rec, fixrec->NF_F_FW_EXT_EVENT);
3492             } else {
3493                 rwRecSetMemo(fwd_rec, event);
3494             }
3495             /* flow denied events from the Cisco ASA have zero in the
3496              * bytes and packets field */
3497             if (0 == pkts) {
3498                 TRACEMSG(1, ("Setting forward bytes and packets to 1"
3499                              " for denied firewall event"));
3500                 bytes = 1;
3501                 pkts = 1;
3502             } else if (0 == bytes) {
3503                 TRACEMSG(1, ("Setting forward bytes equal to packets value"
3504                              " for denied firewall event"));
3505                 bytes = pkts;
3506             }
3507 
3508         } else {
3509             /* flow created, flow updated, flow alert, or something
3510              * unexpected */
3511             if (skpcProbeGetLogFlags(probe) & SOURCE_LOG_FIREWALL) {
3512                 snprintf(msg, sizeof(msg), "firewallEvent=%u,extended=%u",
3513                          event, fixrec->NF_F_FW_EXT_EVENT);
3514                 ski_fixrec_ignore(fixrec, msg);
3515             }
3516             return 0;
3517         }
3518     }
3519 
3520     /* FIXME.  What if the record has a flowDirection field that is
3521      * set to egress (0x01)?  Shouldn't we handle that by reversing
3522      * the record?  Or has fixbuf done that for us? */
3523 
3524     if (0 == bytes && 0 == rev_bytes) {
3525 #if 0
3526         /* flow denied events from the Cisco ASA have zero in the
3527          * bytes and packets field */
3528         if ((skpcProbeGetQuirks(probe) & SKPC_QUIRK_FW_EVENT)
3529             && 0 == pkts
3530             && (SKIPFIX_FW_EVENT_DENIED == fixrec->NF_F_FW_EVENT
3531                 || SKIPFIX_FW_EVENT_DENIED == fixrec->firewallEvent))
3532         {
3533             TRACEMSG(1, ("Setting forward bytes and packets to 1"
3534                          " for denied firewall event"));
3535             bytes = 1;
3536             pkts = 1;
3537         } else
3538 #endif  /* 0 */
3539         {
3540             ski_fixrec_ignore(fixrec, "no forward/reverse octets");
3541             return 0;
3542         }
3543     }
3544 
3545     if (0 == pkts && 0 == rev_pkts) {
3546         if ((skpcProbeGetQuirks(probe) & SKPC_QUIRK_ZERO_PACKETS) == 0) {
3547             /* Ignore records with no volume. */
3548             ski_fixrec_ignore(fixrec, "no forward/reverse packets");
3549             return 0;
3550         }
3551 
3552         /* attempt to handle NetFlowV9 records from an ASA router that
3553          * have no packet count.  The code assumes all records from an
3554          * ASA have a byte count, though this is not always true. */
3555         if (bytes) {
3556             /* there is a forward byte count */
3557             if (0 == pkts) {
3558                 TRACEMSG(1, ("Setting forward packets to 1"));
3559                 pkts = 1;
3560             }
3561         }
3562         if (rev_bytes) {
3563             /* there is a reverse byte count */
3564             if (0 == rev_pkts) {
3565                 TRACEMSG(1, ("Setting reverse packets to 1"));
3566                 rev_pkts = 1;
3567             }
3568         }
3569     }
3570 
3571     /* If the TCP flags are in a subTemplateMultiList, copy them from
3572      * the list and into the record.  The fixbuf.stml gets initialized
3573      * by the call to fBufNext().*/
3574     stml = NULL;
3575     while ((stml = fbSubTemplateMultiListGetNextEntry(&fixrec->stml, stml))) {
3576         if (SKI_TCP_STML_TID != stml->tmplID) {
3577             fbSubTemplateMultiListEntryNextDataPtr(stml, NULL);
3578         } else {
3579             ski_tcp_stml_t *tcp = NULL;
3580             tcp = ((ski_tcp_stml_t*)
3581                    fbSubTemplateMultiListEntryNextDataPtr(stml, tcp));
3582             fixrec->initialTCPFlags = tcp->initialTCPFlags;
3583             fixrec->unionTCPFlags = tcp->unionTCPFlags;
3584             fixrec->reverseInitialTCPFlags = tcp->reverseInitialTCPFlags;
3585             fixrec->reverseUnionTCPFlags = tcp->reverseUnionTCPFlags;
3586             have_tcp_stml = 1;
3587         }
3588     }
3589     fbSubTemplateMultiListClear(&fixrec->stml);
3590 
3591     if (pkts && bytes) {
3592         /* We have forward information. */
3593         TRACEMSG(1, ("Read a forward fixrec record"));
3594 
3595         /* Handle the IP addresses */
3596 #if SK_ENABLE_IPV6
3597         /* Use the IPv6 addresses if they are present and either there
3598          * are no IPv4 addresses or the IPv6 addresses are non-zero. */
3599         if ((record->bmap & TMPL_BIT_sourceIPv6Address)
3600             && (!(record->bmap & TMPL_BIT_sourceIPv4Address)
3601                 || !SK_IPV6_IS_ZERO(fixrec->sourceIPv6Address)
3602                 || !SK_IPV6_IS_ZERO(fixrec->destinationIPv6Address)))
3603         {
3604             /* Values found in IPv6 addresses--use them */
3605             rwRecSetIPv6(fwd_rec);
3606             rwRecMemSetSIPv6(fwd_rec, &fixrec->sourceIPv6Address);
3607             rwRecMemSetDIPv6(fwd_rec, &fixrec->destinationIPv6Address);
3608             rwRecMemSetNhIPv6(fwd_rec, &fixrec->ipNextHopIPv6Address);
3609         } else
3610 #endif /* SK_ENABLE_IPV6 */
3611         {
3612             /* Take values from IPv4 */
3613             rwRecSetSIPv4(fwd_rec, fixrec->sourceIPv4Address);
3614             rwRecSetDIPv4(fwd_rec, fixrec->destinationIPv4Address);
3615             rwRecSetNhIPv4(fwd_rec, fixrec->ipNextHopIPv4Address);
3616         }
3617 
3618         /* Handle the Protocol and Ports */
3619         rwRecSetProto(fwd_rec, fixrec->protocolIdentifier);
3620 
3621         if (!rwRecIsICMP(fwd_rec)
3622             || (!(record->bmap & (TMPL_BIT_icmpTypeCodeIPv4
3623                                   | TMPL_BIT_icmpTypeIPv4))))
3624         {
3625             rwRecSetSPort(fwd_rec, fixrec->sourceTransportPort);
3626             rwRecSetDPort(fwd_rec, fixrec->destinationTransportPort);
3627 
3628         } else if (record->bmap & TMPL_BIT_icmpTypeCodeIPv4) {
3629             rwRecSetSPort(fwd_rec, 0);
3630 #if SK_ENABLE_IPV6
3631             if (rwRecIsIPv6(fwd_rec)) {
3632                 rwRecSetDPort(fwd_rec, fixrec->icmpTypeCodeIPv6);
3633             } else
3634 #endif  /* SK_ENABLE_IPV6 */
3635             {
3636                 rwRecSetDPort(fwd_rec, fixrec->icmpTypeCodeIPv4);
3637             }
3638 
3639         } else if (record->bmap & TMPL_BIT_icmpTypeIPv4) {
3640             /* record has at least one of: icmpTypeIPv4 icmpCodeIPv4,
3641              * icmpTypeIPv6, icmpCodeIPv6 */
3642             rwRecSetSPort(fwd_rec, 0);
3643 #if SK_ENABLE_IPV6
3644             if (rwRecIsIPv6(fwd_rec)) {
3645                 rwRecSetDPort(fwd_rec, ((fixrec->icmpTypeIPv6 << 8)
3646                                         | fixrec->icmpCodeIPv6));
3647             } else
3648 #endif  /* SK_ENABLE_IPV6 */
3649             {
3650                 rwRecSetDPort(fwd_rec, ((fixrec->icmpTypeIPv4 << 8)
3651                                         | fixrec->icmpCodeIPv4));
3652             }
3653         } else {
3654             skAbort();
3655         }
3656 
3657         /* Handle the SNMP or VLAN interfaces */
3658         if (SKPC_IFVALUE_SNMP == skpcProbeGetInterfaceValueType(probe)) {
3659             rwRecSetInput(fwd_rec, CLAMP_VAL16(fixrec->ingressInterface));
3660             rwRecSetOutput(fwd_rec, CLAMP_VAL16(fixrec->egressInterface));
3661         } else {
3662             rwRecSetInput(fwd_rec, fixrec->vlanId);
3663             rwRecSetOutput(fwd_rec, fixrec->postVlanId);
3664         }
3665 
3666         /* Store volume, clamping counts to 32 bits. */
3667         rwRecSetPkts(fwd_rec, CLAMP_VAL32(pkts));
3668         rwRecSetBytes(fwd_rec, CLAMP_VAL32(bytes));
3669 
3670     } else if (rev_pkts && rev_bytes) {
3671         /* We have no forward information, only reverse.  Write the
3672          * source and dest values from the IPFIX record to SiLK's dest
3673          * and source fields, respectively. */
3674         TRACEMSG(1, ("Read a reverse-only fixrec record"));
3675 
3676         /* Store volume, clamping counts to 32 bits. */
3677         rwRecSetPkts(fwd_rec, CLAMP_VAL32(rev_pkts));
3678         rwRecSetBytes(fwd_rec, CLAMP_VAL32(rev_bytes));
3679 
3680         /* This cannot be a bi-flow.  Clear rev_pkts and rev_bytes
3681          * variables now. We check this in the rev_rec code
3682          * below. */
3683         rev_pkts = rev_bytes = 0;
3684 
3685         /* Handle the IP addresses */
3686 #if SK_ENABLE_IPV6
3687         if ((record->bmap & TMPL_BIT_sourceIPv6Address)
3688             && (!(record->bmap & TMPL_BIT_sourceIPv4Address)
3689                 || !SK_IPV6_IS_ZERO(fixrec->sourceIPv6Address)
3690                 || !SK_IPV6_IS_ZERO(fixrec->destinationIPv6Address)))
3691         {
3692             /* Values found in IPv6 addresses--use them */
3693             rwRecSetIPv6(fwd_rec);
3694             rwRecMemSetSIPv6(fwd_rec, &fixrec->destinationIPv6Address);
3695             rwRecMemSetDIPv6(fwd_rec, &fixrec->sourceIPv6Address);
3696             rwRecMemSetNhIPv6(fwd_rec, &fixrec->ipNextHopIPv6Address);
3697         } else
3698 #endif /* SK_ENABLE_IPV6 */
3699         {
3700             /* Take values from IPv4 */
3701             rwRecSetSIPv4(fwd_rec, fixrec->destinationIPv4Address);
3702             rwRecSetDIPv4(fwd_rec, fixrec->sourceIPv4Address);
3703             rwRecSetNhIPv4(fwd_rec, fixrec->ipNextHopIPv4Address);
3704         }
3705 
3706         /* Handle the Protocol and Ports */
3707         rwRecSetProto(fwd_rec, fixrec->protocolIdentifier);
3708         if (!rwRecIsICMP(fwd_rec)) {
3709             rwRecSetSPort(fwd_rec, fixrec->destinationTransportPort);
3710             rwRecSetDPort(fwd_rec, fixrec->sourceTransportPort);
3711         } else if (record->bmap & TMPL_BIT_icmpTypeCodeIPv4) {
3712             rwRecSetSPort(fwd_rec, 0);
3713 #if SK_ENABLE_IPV6
3714             if (rwRecIsIPv6(fwd_rec)) {
3715                 rwRecSetDPort(fwd_rec, fixrec->icmpTypeCodeIPv6);
3716             } else
3717 #endif  /* SK_ENABLE_IPV6 */
3718             {
3719                 rwRecSetDPort(fwd_rec, fixrec->icmpTypeCodeIPv4);
3720             }
3721         } else if (record->bmap & TMPL_BIT_icmpTypeIPv4) {
3722             /* record has at least one of: icmpTypeIPv4 icmpCodeIPv4,
3723              * icmpTypeIPv6, icmpCodeIPv6 */
3724             rwRecSetSPort(fwd_rec, 0);
3725 #if SK_ENABLE_IPV6
3726             if (rwRecIsIPv6(fwd_rec)) {
3727                 rwRecSetDPort(fwd_rec, ((fixrec->icmpTypeIPv6 << 8)
3728                                         | fixrec->icmpCodeIPv6));
3729             } else
3730 #endif  /* SK_ENABLE_IPV6 */
3731             {
3732                 rwRecSetDPort(fwd_rec, ((fixrec->icmpTypeIPv4 << 8)
3733                                         | fixrec->icmpCodeIPv4));
3734             }
3735         } else {
3736             /* For an ICMP record, put whichever Port field is
3737              * non-zero into the record's dPort field */
3738             rwRecSetSPort(fwd_rec, 0);
3739             rwRecSetDPort(fwd_rec, (fixrec->destinationTransportPort
3740                                     ? fixrec->destinationTransportPort
3741                                     : fixrec->sourceTransportPort));
3742         }
3743 
3744         /* Handle the SNMP or VLAN interfaces */
3745         if (SKPC_IFVALUE_SNMP == skpcProbeGetInterfaceValueType(probe)) {
3746             rwRecSetInput(fwd_rec, CLAMP_VAL16(fixrec->egressInterface));
3747             rwRecSetOutput(fwd_rec, CLAMP_VAL16(fixrec->ingressInterface));
3748         } else {
3749             if (record->bmap & TMPL_BIT_reverseVlanId) {
3750                 /* If we have the reverse elements, use them */
3751                 rwRecSetInput(fwd_rec, fixrec->reverseVlanId);
3752                 rwRecSetOutput(fwd_rec, fixrec->reversePostVlanId);
3753             } else if (record->bmap & TMPL_BIT_postVlanId) {
3754                 /* If we have a single vlanId, set 'input' to that value;
3755                  * otherwise, set 'input' to postVlanId and 'output' to
3756                  * vlanId. */
3757                 rwRecSetInput(fwd_rec, fixrec->postVlanId);
3758                 rwRecSetOutput(fwd_rec, fixrec->vlanId);
3759             } else {
3760                 /* we have a single vlanId, so don't swap the values */
3761                 rwRecSetInput(fwd_rec, fixrec->vlanId);
3762             }
3763         }
3764 
3765     } else {
3766         TRACEMSG(2, (("Found zero bytes or packets; byte=%" PRIu64 ", pkt="
3767                       "%" PRIu64 ", rev_byte=%" PRIu64 ", rev_pkt=%" PRIu64),
3768                      bytes, pkts, rev_bytes, rev_pkts));
3769         ski_fixrec_ignore(fixrec, "byte or packet count is zero");
3770         return 0;
3771     }
3772 
3773     skiGauntletOfTime(fbuf, record, probe);
3774 
3775     /* Copy the remainder of the record */
3776     rwRecSetFlowType(fwd_rec, fixrec->silkFlowType);
3777     rwRecSetSensor(fwd_rec, fixrec->silkFlowSensor);
3778     rwRecSetApplication(fwd_rec, fixrec->silkAppLabel);
3779 
3780     tcp_state = fixrec->silkTCPState;
3781     tcp_flags = (fixrec->initialTCPFlags | fixrec->unionTCPFlags);
3782 
3783     /* Ensure the SK_TCPSTATE_EXPANDED bit is properly set. */
3784     if (tcp_flags && IPPROTO_TCP == rwRecGetProto(fwd_rec)) {
3785         /* Flow is TCP and init|session flags had a value. */
3786         rwRecSetFlags(fwd_rec, tcp_flags);
3787         rwRecSetInitFlags(fwd_rec, fixrec->initialTCPFlags);
3788         rwRecSetRestFlags(fwd_rec, fixrec->unionTCPFlags);
3789         tcp_state |= SK_TCPSTATE_EXPANDED;
3790     } else {
3791         /* clear bit when not TCP or no separate init/session flags */
3792         tcp_state &= ~SK_TCPSTATE_EXPANDED;
3793         /* use whatever all-flags we were given; leave initial-flags
3794          * and session-flags unset */
3795         rwRecSetFlags(fwd_rec, fixrec->tcpControlBits);
3796     }
3797 
3798     /* Process the flowEndReason and flowAttributes unless one of
3799      * those bits is already set (via silkTCPState). */
3800     if (!(tcp_state
3801           & (SK_TCPSTATE_FIN_FOLLOWED_NOT_ACK | SK_TCPSTATE_TIMEOUT_KILLED
3802              | SK_TCPSTATE_TIMEOUT_STARTED | SK_TCPSTATE_UNIFORM_PACKET_SIZE)))
3803     {
3804         /* Note active timeout */
3805         if ((fixrec->flowEndReason & SKI_END_MASK) == SKI_END_ACTIVE) {
3806             tcp_state |= SK_TCPSTATE_TIMEOUT_KILLED;
3807         }
3808         /* Note continuation */
3809         if (fixrec->flowEndReason & SKI_END_ISCONT) {
3810             tcp_state |= SK_TCPSTATE_TIMEOUT_STARTED;
3811         }
3812         /* Note flows with records of uniform size */
3813         if (fixrec->flowAttributes & SKI_FLOW_ATTRIBUTE_UNIFORM_PACKET_SIZE) {
3814             tcp_state |= SK_TCPSTATE_UNIFORM_PACKET_SIZE;
3815         }
3816         rwRecSetTcpState(fwd_rec, tcp_state);
3817     }
3818 
3819     rwRecSetTcpState(fwd_rec, tcp_state);
3820 
3821     /* Handle the reverse record if there is one in the IPFIX record,
3822      * which is indicated by the value of 'rev_bytes'.*/
3823     if (0 == rev_bytes) {
3824         /* No data for reverse direction; just clear the record. */
3825         RWREC_CLEAR(record->rev_rec);
3826     } else {
3827         rwRec *rev_rec;
3828         rev_rec = record->rev_rec;
3829 
3830         /* We have data for reverse direction. */
3831         TRACEMSG(1, ("Handling reverse side of bi-flow fixrec record"));
3832 
3833 #define COPY_FORWARD_REC_TO_REVERSE 1
3834 #if COPY_FORWARD_REC_TO_REVERSE
3835         /* Initialize the reverse record with the forward
3836          * record  */
3837         RWREC_COPY(rev_rec, fwd_rec);
3838 #else
3839         /* instead of copying the forward record and changing
3840          * nearly everything, we could just set these fields on
3841          * the reverse record. */
3842         rwRecSetProto(rev_rec, fixrec->protocolIdentifier);
3843         rwRecSetFlowType(rev_rec, fixrec->silkFlowType);
3844         rwRecSetSensor(rev_rec, fixrec->silkFlowSensor);
3845         rwRecSetTcpState(rev_rec, fixrec->silkTCPState);
3846         rwRecSetApplication(rev_rec, fixrec->silkAppLabel);
3847         /* does using the forward nexthop IP for the reverse
3848          * record make any sense?  Shouldn't we check for a
3849          * reverse next hop address? */
3850 #if SK_ENABLE_IPV6
3851         if (rwRecIsIPv6(fwd_rec)) {
3852             rwRecSetIPv6(rev_rec);
3853             rwRecMemSetNhIPv6(rev_rec, &fixrec->ipNextHopIPv6Address);
3854         } else
3855 #endif
3856         {
3857             rwRecSetNhIPv4(rev_rec, &fixrec->ipNextHopIPv4Address);
3858         }
3859 #endif  /* #else clause of #if COPY_FORWARD_REC_TO_REVERSE */
3860 
3861         /* Reverse the IPs */
3862 #if SK_ENABLE_IPV6
3863         if (rwRecIsIPv6(fwd_rec)) {
3864             rwRecMemSetSIPv6(rev_rec, &fixrec->destinationIPv6Address);
3865             rwRecMemSetDIPv6(rev_rec, &fixrec->sourceIPv6Address);
3866         } else
3867 #endif
3868         {
3869             rwRecSetSIPv4(rev_rec, fixrec->destinationIPv4Address);
3870             rwRecSetDIPv4(rev_rec, fixrec->sourceIPv4Address);
3871         }
3872 
3873         /* Reverse the ports unless this is an ICMP record */
3874         if (!rwRecIsICMP(fwd_rec)) {
3875             rwRecSetSPort(rev_rec, rwRecGetDPort(fwd_rec));
3876             rwRecSetDPort(rev_rec, rwRecGetSPort(fwd_rec));
3877         }
3878 
3879         /* Reverse the SNMP or VLAN interfaces */
3880         if (SKPC_IFVALUE_SNMP == skpcProbeGetInterfaceValueType(probe)) {
3881             rwRecSetInput(rev_rec, rwRecGetOutput(fwd_rec));
3882             rwRecSetOutput(rev_rec, rwRecGetInput(fwd_rec));
3883         } else if (record->bmap & TMPL_BIT_reverseVlanId) {
3884             /* Reverse VLAN values exist.  Use them */
3885             rwRecSetInput(rev_rec, fixrec->reverseVlanId);
3886             rwRecSetOutput(rev_rec, fixrec->reversePostVlanId);
3887         } else if (record->bmap & TMPL_BIT_postVlanId) {
3888             /* Reverse the forward values */
3889             rwRecSetInput(rev_rec, fixrec->postVlanId);
3890             rwRecSetOutput(rev_rec, fixrec->vlanId);
3891         } else {
3892             /* we have a single vlanId, so don't swap the values */
3893             rwRecSetInput(rev_rec, fixrec->vlanId);
3894         }
3895 
3896         /* Set volume.  We retrieved them above */
3897         rwRecSetPkts(rev_rec, CLAMP_VAL32(rev_pkts));
3898         rwRecSetBytes(rev_rec, CLAMP_VAL32(rev_bytes));
3899 
3900         /* Calculate reverse start time from reverse RTT */
3901 
3902         /* Reverse flow's start time must be increased and its
3903          * duration decreased by its offset from the forward
3904          * record  */
3905         rwRecSetStartTime(rev_rec, (rwRecGetStartTime(fwd_rec)
3906                                     + fixrec->reverseFlowDeltaMilliseconds));
3907         rwRecSetElapsed(rev_rec, (rwRecGetElapsed(fwd_rec)
3908                                   - fixrec->reverseFlowDeltaMilliseconds));
3909 
3910         /* Note: the value of the 'tcp_state' variable from above is
3911          * what is in rwRecGetTcpState(rev_rec). */
3912 
3913         /* Get reverse TCP flags from the IPFIX record if they are
3914          * available.  Otherwise, leave the flags unchanged (using
3915          * those from the forward direction). */
3916         tcp_flags = (fixrec->reverseInitialTCPFlags
3917                      | fixrec->reverseUnionTCPFlags);
3918 
3919         if (tcp_flags && IPPROTO_TCP == rwRecGetProto(fwd_rec)) {
3920             /* Flow is TCP and init|session has a value. */
3921             TRACEMSG(2, ("Using reverse TCP flags (initial|session)"));
3922             rwRecSetFlags(rev_rec, tcp_flags);
3923             rwRecSetInitFlags(rev_rec, fixrec->reverseInitialTCPFlags);
3924             rwRecSetRestFlags(rev_rec, fixrec->reverseUnionTCPFlags);
3925             tcp_state |= SK_TCPSTATE_EXPANDED;
3926         } else if (record->bmap & TMPL_BIT_reverseTcpControlBits) {
3927             /* Use whatever is in all-flags; clear any init/session
3928              * flags we got from the forward rec. */
3929             TRACEMSG(2, ("Using reverse TCP flags (all only)"));
3930             rwRecSetFlags(rev_rec, fixrec->reverseTcpControlBits);
3931             rwRecSetInitFlags(rev_rec, 0);
3932             rwRecSetRestFlags(rev_rec, 0);
3933             tcp_state &= ~SK_TCPSTATE_EXPANDED;
3934         } else if (have_tcp_stml
3935                    || (record->bmap & TMPL_BIT_reverseInitialTCPFlags))
3936         {
3937             /* If a reverseInitialTCPFlags Element existed on the
3938              * template; use it even though its value is 0. */
3939             TRACEMSG(2, ("Setting all TCP flags to 0"));
3940             rwRecSetFlags(rev_rec, 0);
3941             rwRecSetInitFlags(rev_rec, 0);
3942             rwRecSetRestFlags(rev_rec, 0);
3943             tcp_state &= ~SK_TCPSTATE_EXPANDED;
3944         }
3945         /* else leave the flags unchanged */
3946 
3947         /* Handle reverse flow attributes */
3948         if (fixrec->reverseFlowAttributes
3949             & SKI_FLOW_ATTRIBUTE_UNIFORM_PACKET_SIZE)
3950         {
3951             /* ensure it is set */
3952             tcp_state |= SK_TCPSTATE_UNIFORM_PACKET_SIZE;
3953         } else {
3954             /* ensure it it not set */
3955             tcp_state &= ~SK_TCPSTATE_UNIFORM_PACKET_SIZE;
3956         }
3957 
3958         rwRecSetTcpState(rev_rec, tcp_state);
3959     }
3960 
3961 
3962     /* all done */
3963     return ((rev_bytes > 0) ? 2 : 1);
3964 }
3965 
3966 
3967 
3968 
3969 /**
3970  *    Print a log message saying why a ski_yafrec_t record was ignored
3971  */
3972 static void
ski_yafrec_ignore(const ski_yafrec_t * yafrec,const char * reason)3973 ski_yafrec_ignore(
3974     const ski_yafrec_t *yafrec,
3975     const char         *reason)
3976 {
3977     skipaddr_t ipaddr;
3978     char sipbuf[64];
3979     char dipbuf[64];
3980 
3981     if (!SK_IPV6_IS_ZERO(yafrec->sourceIPv6Address)) {
3982 #ifdef SK_HAVE_INET_NTOP
3983         if (!inet_ntop(AF_INET6, &yafrec->sourceIPv6Address,
3984                        sipbuf, sizeof(sipbuf)))
3985 #endif
3986         {
3987             strcpy(sipbuf, "unknown-v6");
3988         }
3989     } else {
3990         skipaddrSetV4(&ipaddr, &yafrec->sourceIPv4Address);
3991         skipaddrString(sipbuf, &ipaddr, SKIPADDR_CANONICAL);
3992     }
3993     if (!SK_IPV6_IS_ZERO(yafrec->destinationIPv6Address)) {
3994 #ifdef SK_HAVE_INET_NTOP
3995         if (!inet_ntop(AF_INET6, &yafrec->destinationIPv6Address,
3996                        dipbuf, sizeof(dipbuf)))
3997 #endif
3998         {
3999             strcpy(dipbuf, "unknown-v6");
4000         }
4001     } else {
4002         skipaddrSetV4(&ipaddr, &yafrec->destinationIPv4Address);
4003         skipaddrString(dipbuf, &ipaddr, SKIPADDR_CANONICAL);
4004     }
4005 
4006     INFOMSG(("IGNORED|%s|%s|%u|%u|%u|%" PRIu64 "|%" PRIu64 "|%s|"),
4007             sipbuf, dipbuf, yafrec->sourceTransportPort,
4008             yafrec->destinationTransportPort,yafrec->protocolIdentifier,
4009             yafrec->packetDeltaCount, yafrec->octetDeltaCount, reason);
4010 }
4011 
4012 
4013 /**
4014  *    Call fBufNext() and transcode the data into one of the
4015  *    ski_yafrec_spec templates, and then convert the structure into
4016  *    0, 1, or 2 SiLK Flow records and fill the record pointers on the
4017  *    'record' structure.  The return value indicates the number of
4018  *    records converted.  Return -1 on failure.
4019  */
4020 static int
ski_yafrec_next(fBuf_t * fbuf,ski_record_t * record,const skpc_probe_t * probe,GError ** err)4021 ski_yafrec_next(
4022     fBuf_t                 *fbuf,
4023     ski_record_t           *record,
4024     const skpc_probe_t     *probe,
4025     GError                **err)
4026 {
4027     fbSubTemplateMultiListEntry_t *stml;
4028     ski_yafrec_t *yafrec;
4029     size_t len;
4030     uint8_t tcp_state;
4031     int have_tcp_stml = 0;
4032     uint16_t int_tid;
4033     rwRec *fwd_rec;
4034     rwRec *rev_rec;
4035 
4036     DEFINE_PREFIX_BUF(prefix);
4037     makeTracemsgPrefix(prefix, sizeof(prefix), skpcProbeGetName(probe),
4038                        fbSessionGetDomain(fBufGetSession(fbuf)),
4039                        record->tid, record->tmpl);
4040     TRACEMSG(2, (("%s bmap " BMAP_PRI ", read by ski_yafrec_next()"),
4041                  prefix, record->bmap));
4042     assert(SKI_RECTYPE_YAFREC == record->rectype);
4043 
4044     /* Get a local handle to the record and clear it */
4045     fwd_rec = record->fwd_rec;
4046     RWREC_CLEAR(fwd_rec);
4047 
4048     /* The lower 16 bits of the context is the TID of the template to
4049      * use to read the record. */
4050     int_tid = record->bmap & UINT16_MAX;
4051     if ((int_tid & SKI_YAFREC_TID) != SKI_YAFREC_TID) {
4052         TRACEMSG(1, ("ski_yafrec_next() called but TID %#06x does not match",
4053                      int_tid));
4054         return ski_ignore_next(fbuf, record, probe, err);
4055     }
4056     if (!fBufSetInternalTemplate(fbuf, int_tid, err)) {
4057         TRACEMSG(1, (("ski_yafrec_next() called but setting Template"
4058                       " TID %#06x failed: %s"), int_tid, (*err)->message));
4059         g_clear_error(err);
4060         return ski_ignore_next(fbuf, record, probe, err);
4061     }
4062     len = sizeof(record->data.yafrec);
4063     if (!fBufNext(fbuf, (uint8_t *)&record->data.yafrec, &len, err)) {
4064         return -1;
4065     }
4066     yafrec = &record->data.yafrec;
4067     assert((record->bmap & YAFREC_STML)
4068            ? (len == sizeof(ski_yafrec_t))
4069            : (len == offsetof(ski_yafrec_t, stml)));
4070 
4071     if ((yafrec->flowEndReason & SKI_END_MASK)== SKI_END_YAF_INTERMEDIATE_FLOW)
4072     {
4073         TRACEMSG(2, ("Ignored YAF intermediate uniflow"));
4074         return 0;
4075     }
4076 
4077     /* Ignore records that do not have IPv4 addresses when SiLK was
4078      * built without IPv6 support. */
4079 #if !SK_ENABLE_IPV6
4080     if (record->bmap & YAFREC_ONLY_IP6) {
4081         ski_yafrec_ignore(yafrec, "IPv6 record");
4082         return 0;
4083     }
4084 #endif  /* SK_ENABLE_IPV6 */
4085 
4086     /* Volume */
4087     if (yafrec->packetDeltaCount && yafrec->octetDeltaCount) {
4088         /* Store volume, clamping counts to 32 bits. */
4089         rwRecSetPkts(fwd_rec, CLAMP_VAL32(yafrec->packetDeltaCount));
4090         rwRecSetBytes(fwd_rec, CLAMP_VAL32(yafrec->octetDeltaCount));
4091 
4092         if (yafrec->reversePacketDeltaCount && yafrec->reverseOctetDeltaCount){
4093             TRACEMSG(1, ("Read a bi-flow yafrec record"));
4094             RWREC_CLEAR(record->rev_rec);
4095             rev_rec = record->rev_rec;
4096             rwRecSetPkts(rev_rec,CLAMP_VAL32(yafrec->reversePacketDeltaCount));
4097             rwRecSetBytes(rev_rec,CLAMP_VAL32(yafrec->reverseOctetDeltaCount));
4098         } else {
4099             TRACEMSG(1, ("Read a forward yafrec record"));
4100             rev_rec = NULL;
4101         }
4102     } else if (yafrec->reversePacketDeltaCount
4103                && yafrec->reverseOctetDeltaCount)
4104     {
4105         /* We have no forward information, only reverse.  Swap the IP
4106          * addresses, the ports, the SNMP interfaces, and the VLAN IDs
4107          * in the yafrec */
4108         ski_yafrec_t reversed;
4109 
4110         TRACEMSG(1, ("Read a reverse-only yafrec record"));
4111         rev_rec = NULL;
4112 
4113         memcpy(reversed.sourceIPv6Address, yafrec->destinationIPv6Address,
4114                sizeof(yafrec->sourceIPv6Address));
4115         memcpy(yafrec->destinationIPv6Address, yafrec->sourceIPv6Address,
4116                sizeof(yafrec->sourceIPv6Address));
4117         memcpy(yafrec->sourceIPv6Address, reversed.sourceIPv6Address,
4118                sizeof(yafrec->sourceIPv6Address));
4119 
4120         reversed.sourceIPv4Address = yafrec->destinationIPv4Address;
4121         yafrec->destinationIPv4Address = yafrec->sourceIPv4Address;
4122         yafrec->sourceIPv4Address = reversed.sourceIPv4Address;
4123 
4124         reversed.sourceTransportPort = yafrec->destinationTransportPort;
4125         yafrec->destinationTransportPort = yafrec->sourceTransportPort;
4126         yafrec->sourceTransportPort = reversed.sourceTransportPort;
4127 
4128         reversed.ingressInterface = yafrec->egressInterface;
4129         yafrec->egressInterface = yafrec->ingressInterface;
4130         yafrec->ingressInterface = reversed.ingressInterface;
4131 
4132         if (yafrec->reverseVlanId) {
4133             yafrec->vlanId = yafrec->reverseVlanId;
4134         }
4135 
4136         rwRecSetPkts(fwd_rec, CLAMP_VAL32(yafrec->reversePacketDeltaCount));
4137         rwRecSetBytes(fwd_rec, CLAMP_VAL32(yafrec->reverseOctetDeltaCount));
4138 
4139     } else {
4140         TRACEMSG(2, (("Found zero bytes or packets; byte=%" PRIu64 ", pkt="
4141                       "%" PRIu64 ", rev_byte=%" PRIu64 ", rev_pkt=%" PRIu64),
4142                      yafrec->octetDeltaCount, yafrec->packetDeltaCount,
4143                      yafrec->reverseOctetDeltaCount,
4144                      yafrec->reversePacketDeltaCount));
4145         ski_yafrec_ignore(yafrec, "byte or packet count is zero");
4146         return 0;
4147     }
4148 
4149     /* Simple fields */
4150     rwRecSetProto(fwd_rec, yafrec->protocolIdentifier);
4151     rwRecSetFlowType(fwd_rec, yafrec->silkFlowType);
4152     rwRecSetSensor(fwd_rec, yafrec->silkFlowSensor);
4153     rwRecSetApplication(fwd_rec, yafrec->silkAppLabel);
4154     if (rev_rec) {
4155         rwRecSetProto(rev_rec, yafrec->protocolIdentifier);
4156         rwRecSetFlowType(rev_rec, yafrec->silkFlowType);
4157         rwRecSetSensor(rev_rec, yafrec->silkFlowSensor);
4158         rwRecSetApplication(rev_rec, yafrec->silkAppLabel);
4159     }
4160 
4161     /* Time stamp */
4162     rwRecSetStartTime(fwd_rec, (sktime_t)yafrec->flowStartMilliseconds);
4163     if (yafrec->flowEndMilliseconds < yafrec->flowStartMilliseconds) {
4164         rwRecSetElapsed(fwd_rec, 0);
4165     } else if ((yafrec->flowEndMilliseconds - yafrec->flowStartMilliseconds)
4166                > UINT32_MAX)
4167     {
4168         rwRecSetElapsed(fwd_rec, UINT32_MAX);
4169     } else {
4170         rwRecSetElapsed(fwd_rec, (yafrec->flowEndMilliseconds
4171                                   - yafrec->flowStartMilliseconds));
4172     }
4173     if (skpcProbeGetLogFlags(probe) & SOURCE_LOG_TIMESTAMPS) {
4174         char stime_buf[SKTIMESTAMP_STRLEN];
4175         sktimestamp_r(stime_buf, rwRecGetStartTime(fwd_rec), SKTIMESTAMP_UTC);
4176         INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
4177                  " flowStartMilliseconds=%" PRIu64
4178                  ", flowEndMilliseconds=%" PRIu64),
4179                 skpcProbeGetName(probe), stime_buf,
4180                 (double)rwRecGetElapsed(fwd_rec)/1000,
4181                 yafrec->flowStartMilliseconds, yafrec->flowEndMilliseconds);
4182     }
4183 
4184     if (rev_rec) {
4185         /* Reverse flow's start time must be increased and its
4186          * duration decreased by its offset from the forward
4187          * record  */
4188         rwRecSetStartTime(rev_rec, (rwRecGetStartTime(fwd_rec)
4189                                     + yafrec->reverseFlowDeltaMilliseconds));
4190         if (rwRecGetElapsed(fwd_rec) < yafrec->reverseFlowDeltaMilliseconds) {
4191             rwRecSetElapsed(rev_rec, 0);
4192         } else {
4193             rwRecSetElapsed(rev_rec, (rwRecGetElapsed(fwd_rec)
4194                                       - yafrec->reverseFlowDeltaMilliseconds));
4195         }
4196     }
4197 
4198     /* IP Addresses */
4199 #if SK_ENABLE_IPV6
4200     /* Use the IPv6 addresses if they are the only ones present or
4201      * both addresses are present and at least one of the IPv6
4202      * addresses is non-zero. */
4203     if ((record->bmap & YAFREC_ONLY_IP6)
4204         || ((record->bmap & YAFREC_IP_BOTH)
4205             && !(SK_IPV6_IS_ZERO(yafrec->sourceIPv6Address)
4206                  && SK_IPV6_IS_ZERO(yafrec->destinationIPv6Address))))
4207     {
4208         /* Values found in IPv6 addresses--use them */
4209         rwRecSetIPv6(fwd_rec);
4210         rwRecMemSetSIPv6(fwd_rec, &yafrec->sourceIPv6Address);
4211         rwRecMemSetDIPv6(fwd_rec, &yafrec->destinationIPv6Address);
4212         rwRecMemSetNhIPv6(fwd_rec, &yafrec->ipNextHopIPv6Address);
4213         if (rev_rec) {
4214             rwRecSetIPv6(rev_rec);
4215             rwRecMemSetSIPv6(rev_rec, &yafrec->destinationIPv6Address);
4216             rwRecMemSetDIPv6(rev_rec, &yafrec->sourceIPv6Address);
4217             rwRecMemSetNhIPv6(rev_rec, &yafrec->ipNextHopIPv6Address);
4218         }
4219     } else
4220 #endif /* SK_ENABLE_IPV6 */
4221     {
4222         /* Take values from IPv4 */
4223         rwRecSetSIPv4(fwd_rec, yafrec->sourceIPv4Address);
4224         rwRecSetDIPv4(fwd_rec, yafrec->destinationIPv4Address);
4225         rwRecSetNhIPv4(fwd_rec, yafrec->ipNextHopIPv4Address);
4226         if (rev_rec) {
4227             rwRecSetSIPv4(rev_rec, yafrec->destinationIPv4Address);
4228             rwRecSetDIPv4(rev_rec, yafrec->sourceIPv4Address);
4229             rwRecSetNhIPv4(rev_rec, yafrec->ipNextHopIPv4Address);
4230         }
4231     }
4232 
4233     /* SNMP or VLAN interfaces */
4234     if (SKPC_IFVALUE_SNMP == skpcProbeGetInterfaceValueType(probe)) {
4235         rwRecSetInput(fwd_rec, CLAMP_VAL16(yafrec->ingressInterface));
4236         rwRecSetOutput(fwd_rec, CLAMP_VAL16(yafrec->egressInterface));
4237         if (rev_rec) {
4238             rwRecSetInput(rev_rec, CLAMP_VAL16(yafrec->egressInterface));
4239             rwRecSetOutput(rev_rec, CLAMP_VAL16(yafrec->ingressInterface));
4240         }
4241     } else {
4242         rwRecSetInput(fwd_rec, yafrec->vlanId);
4243         rwRecSetOutput(fwd_rec, 0);
4244         if (rev_rec) {
4245             if (record->bmap & TMPL_BIT_reverseVlanId) {
4246                 /* Reverse VLAN value exists.  Use it */
4247                 rwRecSetInput(rev_rec, yafrec->reverseVlanId);
4248                 rwRecSetOutput(rev_rec, 0);
4249             } else {
4250                 /* we have a single vlanId, so don't swap the values */
4251                 rwRecSetInput(rev_rec, yafrec->vlanId);
4252                 rwRecSetOutput(rev_rec, 0);
4253             }
4254         }
4255     }
4256 
4257     /* Attributes, ICMP Type/Code, Ports, TCP Flags */
4258     tcp_state = yafrec->silkTCPState;
4259 
4260     /* Process the flowEndReason and flowAttributes unless one of
4261      * those bits is already set (via silkTCPState). */
4262     if (!(tcp_state
4263           & (SK_TCPSTATE_FIN_FOLLOWED_NOT_ACK | SK_TCPSTATE_TIMEOUT_KILLED
4264              | SK_TCPSTATE_TIMEOUT_STARTED | SK_TCPSTATE_UNIFORM_PACKET_SIZE)))
4265     {
4266         /* Note active timeout */
4267         if ((yafrec->flowEndReason & SKI_END_MASK) == SKI_END_ACTIVE) {
4268             tcp_state |= SK_TCPSTATE_TIMEOUT_KILLED;
4269         }
4270         /* Note continuation */
4271         if (yafrec->flowEndReason & SKI_END_ISCONT) {
4272             tcp_state |= SK_TCPSTATE_TIMEOUT_STARTED;
4273         }
4274         /* Note flows with records of uniform size */
4275         if (yafrec->flowAttributes & SKI_FLOW_ATTRIBUTE_UNIFORM_PACKET_SIZE) {
4276             tcp_state |= SK_TCPSTATE_UNIFORM_PACKET_SIZE;
4277         }
4278     }
4279 
4280     if (IPPROTO_TCP != yafrec->protocolIdentifier) {
4281         /* Free STML list memory */
4282         if (record->bmap & YAFREC_STML) {
4283             fbSubTemplateMultiListClear(&yafrec->stml);
4284         }
4285 
4286         /* For TCP Flags, use whatever value was given in
4287          * tcpControlBits; ensure expanded bit in tcp_state is off. */
4288         rwRecSetFlags(fwd_rec, yafrec->tcpControlBits);
4289         tcp_state &= ~SK_TCPSTATE_EXPANDED;
4290         rwRecSetTcpState(fwd_rec, tcp_state);
4291 
4292         if (rev_rec) {
4293             /* Use reverse value if given; else foward value */
4294             if (record->bmap & TMPL_BIT_reverseTcpControlBits) {
4295                 rwRecSetFlags(rev_rec, yafrec->reverseTcpControlBits);
4296             } else {
4297                 rwRecSetFlags(rev_rec, yafrec->tcpControlBits);
4298             }
4299 
4300             /* Handle reverse flow attributes */
4301             if (yafrec->reverseFlowAttributes
4302                 & SKI_FLOW_ATTRIBUTE_UNIFORM_PACKET_SIZE)
4303             {
4304                 /* ensure it is set */
4305                 tcp_state |= SK_TCPSTATE_UNIFORM_PACKET_SIZE;
4306             } else {
4307                 /* ensure it it not set */
4308                 tcp_state &= ~SK_TCPSTATE_UNIFORM_PACKET_SIZE;
4309             }
4310             rwRecSetTcpState(rev_rec, tcp_state);
4311         }
4312 
4313         if (!rwRecIsICMP(fwd_rec)) {
4314             /* Use whatever values are in sport and dport */
4315             rwRecSetSPort(fwd_rec, yafrec->sourceTransportPort);
4316             rwRecSetDPort(fwd_rec, yafrec->destinationTransportPort);
4317             if (rev_rec) {
4318                 rwRecSetSPort(rev_rec, yafrec->destinationTransportPort);
4319                 rwRecSetDPort(rev_rec, yafrec->sourceTransportPort);
4320             }
4321         } else {
4322             /* ICMP Record */
4323             /* Store ((icmpType << 8) | icmpCode) in the dPort */
4324             rwRecSetSPort(fwd_rec, 0);
4325             if (record->bmap & TMPL_BIT_icmpTypeCodeIPv4) {
4326                 rwRecSetDPort(fwd_rec, yafrec->icmpTypeCode);
4327             } else {
4328                 rwRecSetDPort(fwd_rec, yafrec->destinationTransportPort);
4329             }
4330 
4331             if (rev_rec) {
4332                 /* use the same sPort and dPort values */
4333                 rwRecSetSPort(rev_rec, 0);
4334                 rwRecSetDPort(rev_rec, rwRecGetDPort(fwd_rec));
4335             }
4336         }
4337     } else {
4338         /* Record is TCP */
4339         rwRecSetSPort(fwd_rec, yafrec->sourceTransportPort);
4340         rwRecSetDPort(fwd_rec, yafrec->destinationTransportPort);
4341         if (rev_rec) {
4342             rwRecSetSPort(rev_rec, yafrec->destinationTransportPort);
4343             rwRecSetDPort(rev_rec, yafrec->sourceTransportPort);
4344         }
4345 
4346         if (record->bmap & YAFREC_STML) {
4347             /* The TCP flags are in a subTemplateMultiList, copy them
4348              * from the list to the record.  The yafrec->stml gets
4349              * initialized by the call to fBufNext().*/
4350             stml = NULL;
4351             while ((stml = fbSubTemplateMultiListGetNextEntry(&yafrec->stml,
4352                                                               stml)))
4353             {
4354                 if (SKI_TCP_STML_TID != stml->tmplID) {
4355                     fbSubTemplateMultiListEntryNextDataPtr(stml, NULL);
4356                 } else {
4357                     ski_tcp_stml_t *tcp = NULL;
4358                     tcp = ((ski_tcp_stml_t*)
4359                            fbSubTemplateMultiListEntryNextDataPtr(stml, tcp));
4360                     yafrec->initialTCPFlags = tcp->initialTCPFlags;
4361                     yafrec->unionTCPFlags = tcp->unionTCPFlags;
4362                     yafrec->reverseInitialTCPFlags
4363                         = tcp->reverseInitialTCPFlags;
4364                     yafrec->reverseUnionTCPFlags = tcp->reverseUnionTCPFlags;
4365                     have_tcp_stml = 1;
4366                 }
4367             }
4368             fbSubTemplateMultiListClear(&yafrec->stml);
4369         }
4370 
4371         if (yafrec->initialTCPFlags | yafrec->unionTCPFlags) {
4372             rwRecSetInitFlags(fwd_rec, yafrec->initialTCPFlags);
4373             rwRecSetRestFlags(fwd_rec, yafrec->unionTCPFlags);
4374             rwRecSetFlags(fwd_rec, (yafrec->initialTCPFlags
4375                                     | yafrec->unionTCPFlags));
4376             tcp_state |= SK_TCPSTATE_EXPANDED;
4377         } else {
4378             rwRecSetFlags(fwd_rec, yafrec->tcpControlBits);
4379             tcp_state &= ~SK_TCPSTATE_EXPANDED;
4380         }
4381         rwRecSetTcpState(fwd_rec, tcp_state);
4382 
4383         if (rev_rec) {
4384             /* Get reverse TCP flags from the IPFIX record if they are
4385              * available.  Otherwise, use those from the forward
4386              * direction. */
4387             if (yafrec->reverseInitialTCPFlags | yafrec->reverseUnionTCPFlags){
4388                 rwRecSetInitFlags(rev_rec, yafrec->reverseInitialTCPFlags);
4389                 rwRecSetRestFlags(rev_rec, yafrec->reverseUnionTCPFlags);
4390                 rwRecSetFlags(rev_rec, (yafrec->reverseInitialTCPFlags
4391                                         | yafrec->reverseUnionTCPFlags));
4392                 tcp_state |= SK_TCPSTATE_EXPANDED;
4393 
4394             } else if (record->bmap & TMPL_BIT_reverseTcpControlBits) {
4395                 /* Use whatever is in all-flags; clear any init/session
4396                  * flags we got from the forward fwd_rec. */
4397                 TRACEMSG(2, ("Using reverse TCP flags (all only)"));
4398                 rwRecSetFlags(rev_rec, yafrec->reverseTcpControlBits);
4399                 rwRecSetInitFlags(rev_rec, 0);
4400                 rwRecSetRestFlags(rev_rec, 0);
4401                 tcp_state &= ~SK_TCPSTATE_EXPANDED;
4402             } else if (have_tcp_stml
4403                        || (record->bmap & TMPL_BIT_reverseInitialTCPFlags))
4404             {
4405                 /* If a reverseInitialTCPFlags Element existed on the
4406                  * template; use it even though its value is 0. */
4407                 TRACEMSG(2, ("Setting all TCP flags to 0"));
4408                 rwRecSetFlags(rev_rec, 0);
4409                 rwRecSetInitFlags(rev_rec, 0);
4410                 rwRecSetRestFlags(rev_rec, 0);
4411                 tcp_state &= ~SK_TCPSTATE_EXPANDED;
4412             } else {
4413                 /* Use foward flags */
4414                 rwRecSetInitFlags(rev_rec, rwRecGetInitFlags(fwd_rec));
4415                 rwRecSetRestFlags(rev_rec, rwRecGetRestFlags(fwd_rec));
4416                 rwRecSetFlags(rev_rec, rwRecGetFlags(fwd_rec));
4417             }
4418 
4419             /* Handle reverse flow attributes */
4420             if (yafrec->reverseFlowAttributes
4421                 & SKI_FLOW_ATTRIBUTE_UNIFORM_PACKET_SIZE)
4422             {
4423                 /* ensure it is set */
4424                 tcp_state |= SK_TCPSTATE_UNIFORM_PACKET_SIZE;
4425             } else {
4426                 /* ensure it it not set */
4427                 tcp_state &= ~SK_TCPSTATE_UNIFORM_PACKET_SIZE;
4428             }
4429             rwRecSetTcpState(rev_rec, tcp_state);
4430         }
4431     }
4432 
4433     /* all done */
4434     return ((rev_rec) ? 2 : 1);
4435 }
4436 
4437 
4438 /**
4439  *    Print a log message saying why a ski_nf9rec_t record was ignored
4440  */
4441 static void
ski_nf9rec_ignore(const ski_record_t * record,const char * reason)4442 ski_nf9rec_ignore(
4443     const ski_record_t *record,
4444     const char         *reason)
4445 {
4446     const ski_nf9rec_t *nf9rec = &record->data.nf9rec;
4447     skipaddr_t ipaddr;
4448     char sipbuf[64];
4449     char dipbuf[64];
4450 
4451     if (record->bmap & NF9REC_IP6) {
4452 #ifdef SK_HAVE_INET_NTOP
4453         if (!inet_ntop(AF_INET6, &nf9rec->addr.ip6.sourceIPv6Address,
4454                        sipbuf, sizeof(sipbuf)))
4455 #endif
4456         {
4457             strcpy(sipbuf, "unknown-v6");
4458         }
4459 #ifdef SK_HAVE_INET_NTOP
4460         if (!inet_ntop(AF_INET6, &nf9rec->addr.ip6.destinationIPv6Address,
4461                        dipbuf, sizeof(dipbuf)))
4462 #endif
4463         {
4464             strcpy(dipbuf, "unknown-v6");
4465         }
4466     } else {
4467         skipaddrSetV4(&ipaddr, &nf9rec->addr.ip4.sourceIPv4Address);
4468         skipaddrString(sipbuf, &ipaddr, SKIPADDR_CANONICAL);
4469         skipaddrSetV4(&ipaddr, &nf9rec->addr.ip4.destinationIPv4Address);
4470         skipaddrString(dipbuf, &ipaddr, SKIPADDR_CANONICAL);
4471     }
4472 
4473     INFOMSG(("IGNORED|%s|%s|%u|%u|%u|%" PRIu64 "|%" PRIu64 "|%s|"),
4474             sipbuf, dipbuf, nf9rec->sourceTransportPort,
4475             nf9rec->destinationTransportPort,nf9rec->protocolIdentifier,
4476             nf9rec->packetDeltaCount, nf9rec->octetDeltaCount, reason);
4477 }
4478 
4479 
4480 /**
4481  *    Call fBufNext() and transcode the data into one of the
4482  *    ski_nf9rec_spec templates, and then convert the structure into
4483  *    0, 1, or 2 SiLK Flow records and fill the record pointers on the
4484  *    'record' structure.  The return value indicates the number of
4485  *    records converted.  Return -1 on failure.
4486  */
4487 static int
ski_nf9rec_next(fBuf_t * fbuf,ski_record_t * record,const skpc_probe_t * probe,GError ** err)4488 ski_nf9rec_next(
4489     fBuf_t                 *fbuf,
4490     ski_record_t           *record,
4491     const skpc_probe_t     *probe,
4492     GError                **err)
4493 {
4494     char stime_buf[SKTIMESTAMP_STRLEN];
4495     ski_nf9rec_t *nf9rec;
4496     size_t len;
4497     uint16_t int_tid;
4498     rwRec *fwd_rec;
4499     rwRec *rev_rec;
4500 
4501     DEFINE_PREFIX_BUF(prefix);
4502     makeTracemsgPrefix(prefix, sizeof(prefix), skpcProbeGetName(probe),
4503                        fbSessionGetDomain(fBufGetSession(fbuf)),
4504                        record->tid, record->tmpl);
4505     TRACEMSG(2, (("%s bmap " BMAP_PRI ", read by ski_nf9rec_next()"),
4506                  prefix, record->bmap));
4507     assert(SKI_RECTYPE_NF9REC == record->rectype);
4508 
4509     /* Get a local handle to the record and clear it */
4510     fwd_rec = record->fwd_rec;
4511     RWREC_CLEAR(fwd_rec);
4512     rev_rec = NULL;
4513 
4514     /* The lower 16 bits of the context is the TID of the template to
4515      * use to read the record. */
4516     int_tid = record->bmap & UINT16_MAX;
4517     if ((int_tid & SKI_NF9REC_TID) != SKI_NF9REC_TID) {
4518         TRACEMSG(1, ("ski_nf9rec_next() called but TID %#06x does not match",
4519                      int_tid));
4520         return ski_ignore_next(fbuf, record, probe, err);
4521     }
4522     if (!fBufSetInternalTemplate(fbuf, int_tid, err)) {
4523         TRACEMSG(1, (("ski_nf9rec_next() called but setting Template"
4524                       " TID %#06x failed: %s"), int_tid, (*err)->message));
4525         g_clear_error(err);
4526         return ski_ignore_next(fbuf, record, probe, err);
4527     }
4528     len = sizeof(record->data.nf9rec);
4529     if (!fBufNext(fbuf, (uint8_t *)&record->data.nf9rec, &len, err)) {
4530         return -1;
4531     }
4532     assert(len == sizeof(ski_nf9rec_t));
4533     nf9rec = &record->data.nf9rec;
4534 
4535     /* Ignore records that do not have IPv4 addresses when SiLK was
4536      * built without IPv6 support. */
4537 #if !SK_ENABLE_IPV6
4538     if (record->bmap & NF9REC_IP6) {
4539         ski_nf9rec_ignore(record, "IPv6 record");
4540         return 0;
4541     }
4542 #endif  /* SK_ENABLE_IPV6 */
4543 
4544     /* When the nf9-out-is-reverse quirk is set, flip a bit on the
4545      * record's bitmap so volume is treated as initiator/responder. */
4546     if (skpcProbeGetQuirks(probe) & SKPC_QUIRK_NF9_OUT_IS_REVERSE) {
4547         TRACEMSG(2, (("Modifying record bmap from " BMAP_PRI " to " BMAP_PRI
4548                       " due to nf9-out-is-reverse"),
4549                      record->bmap, record->bmap | NF9REC_INITIATOR));
4550         record->bmap |= NF9REC_INITIATOR;
4551     }
4552 
4553     /* Handle the firewall settings and check for reverse (responder)
4554      * volume.  See the big comment in ski_fixrec_next() for all the
4555      * gory details on firewall rules. */
4556     if (record->bmap & (TMPL_BIT_firewallEvent | TMPL_BIT_NF_F_FW_EVENT
4557                         | TMPL_BIT_NF_F_FW_EXT_EVENT))
4558     {
4559         /* Handle firewall events */
4560         char msg[64];
4561         uint8_t event = (nf9rec->firewallEvent
4562                          ? nf9rec->firewallEvent : nf9rec->NF_F_FW_EVENT);
4563         if (SKIPFIX_FW_EVENT_DENIED == event) {
4564             /* flow denied; there should be no reverse record */
4565             TRACEMSG(1, (("Processing flow denied event as actual flow record;"
4566                           " firewallEvent=%u, NF_F_FW_EVENT=%u,"
4567                           " NF_F_FW_EXT_EVENT=%u"),
4568                          nf9rec->firewallEvent, nf9rec->NF_F_FW_EVENT,
4569                          nf9rec->NF_F_FW_EXT_EVENT));
4570             if (SKIPFIX_FW_EVENT_DENIED_CHECK_VALID(nf9rec->NF_F_FW_EXT_EVENT))
4571             {
4572                 rwRecSetMemo(fwd_rec, nf9rec->NF_F_FW_EXT_EVENT);
4573             } else {
4574                 rwRecSetMemo(fwd_rec, event);
4575             }
4576             /* flow denied events from the Cisco ASA typically have
4577              * zero in the bytes and packets field */
4578             if (nf9rec->octetDeltaCount) {
4579                 rwRecSetBytes(fwd_rec, CLAMP_VAL32(nf9rec->octetDeltaCount));
4580                 if (nf9rec->packetDeltaCount) {
4581                     rwRecSetPkts(
4582                         fwd_rec, CLAMP_VAL32(nf9rec->packetDeltaCount));
4583                 } else {
4584                     TRACEMSG(1, ("Setting forward packets to 1"
4585                                  " for denied firewall event"));
4586                     rwRecSetPkts(fwd_rec, 1);
4587                 }
4588             } else if (nf9rec->postOctetDeltaCount
4589                        && !(record->bmap & NF9REC_INITIATOR))
4590             {
4591                 /* postOctet value is non-zero and it is not
4592                  * responderOctets; use in place of standard value */
4593                 rwRecSetBytes(fwd_rec,
4594                               CLAMP_VAL32(nf9rec->postOctetDeltaCount));
4595                 if (nf9rec->postPacketDeltaCount) {
4596                     rwRecSetPkts(fwd_rec,
4597                                  CLAMP_VAL32(nf9rec->postPacketDeltaCount));
4598                 } else {
4599                     TRACEMSG(1, ("Setting forward packets to 1 for denied"
4600                                  " firewall event (postOctets non-zero)"));
4601                     rwRecSetPkts(fwd_rec, 1);
4602                 }
4603             } else if (nf9rec->packetDeltaCount) {
4604                 TRACEMSG(1, ("Setting forward bytes equal to packets value"
4605                              " for denied firewall event"));
4606                 rwRecSetBytes(fwd_rec, CLAMP_VAL32(nf9rec->packetDeltaCount));
4607                 rwRecSetPkts(fwd_rec, CLAMP_VAL32(nf9rec->packetDeltaCount));
4608             } else {
4609                 TRACEMSG(1, ("Setting forward bytes and packets to 1"
4610                              " for denied firewall event"));
4611                 rwRecSetBytes(fwd_rec, 1);
4612                 rwRecSetPkts(fwd_rec, 1);
4613             }
4614 
4615         } else if (SKIPFIX_FW_EVENT_DELETED != event) {
4616             /* flow created, flow updated, flow alert, or something
4617              * unexpected. These are ignored */
4618             if (skpcProbeGetLogFlags(probe) & SOURCE_LOG_FIREWALL) {
4619                 snprintf(msg, sizeof(msg), "firewallEvent=%u,extended=%u",
4620                          event, nf9rec->NF_F_FW_EXT_EVENT);
4621                 ski_nf9rec_ignore(record, msg);
4622             }
4623             return 0;
4624         } else {
4625             /* flow deleted */
4626             TRACEMSG(1,(("Processing flow deleted event as actual flow record;"
4627                          " firewallEvent=%u, NF_F_FW_EVENT=%u,"
4628                          " NF_F_FW_EXT_EVENT=%u"),
4629                         nf9rec->firewallEvent, nf9rec->NF_F_FW_EVENT,
4630                         nf9rec->NF_F_FW_EXT_EVENT));
4631             /* these normally have a byte count, but not always */
4632             if (nf9rec->octetDeltaCount) {
4633                 rwRecSetBytes(fwd_rec, CLAMP_VAL32(nf9rec->octetDeltaCount));
4634                 if (nf9rec->packetDeltaCount) {
4635                     rwRecSetPkts(
4636                         fwd_rec, CLAMP_VAL32(nf9rec->packetDeltaCount));
4637                 } else {
4638                     TRACEMSG(1, ("Setting forward packets to 1"
4639                                  " for deleted firewall event"));
4640                     rwRecSetPkts(fwd_rec, 1);
4641                 }
4642             } else if (nf9rec->postOctetDeltaCount
4643                        && !(record->bmap & NF9REC_INITIATOR))
4644             {
4645                 /* postOctet value is non-zero and it is not
4646                  * responderOctets; use in place of standard value */
4647                 rwRecSetBytes(fwd_rec,
4648                               CLAMP_VAL32(nf9rec->postOctetDeltaCount));
4649                 if (nf9rec->postPacketDeltaCount) {
4650                     rwRecSetPkts(fwd_rec,
4651                                  CLAMP_VAL32(nf9rec->postPacketDeltaCount));
4652                 } else {
4653                     TRACEMSG(1, ("Setting forward packets to 1 for deleted"
4654                                  " firewall event (postOctets non-zero)"));
4655                     rwRecSetPkts(fwd_rec, 1);
4656                 }
4657             } else if (nf9rec->packetDeltaCount) {
4658                 TRACEMSG(1, ("Setting forward bytes equal to packets value"
4659                              " for deleted firewall event"));
4660                 rwRecSetBytes(fwd_rec, CLAMP_VAL32(nf9rec->packetDeltaCount));
4661                 rwRecSetPkts(fwd_rec, CLAMP_VAL32(nf9rec->packetDeltaCount));
4662             } else {
4663                 TRACEMSG(1, ("Setting forward bytes and packets to 1"
4664                              " for deleted firewall event"));
4665                 rwRecSetBytes(fwd_rec, 1);
4666                 rwRecSetPkts(fwd_rec, 1);
4667             }
4668 
4669             /* handle reverse record */
4670             if (!(record->bmap & NF9REC_INITIATOR)) {
4671                 /* There is no reverse data */
4672             } else if (nf9rec->postOctetDeltaCount) {
4673                 /* there is a reverse byte count: postOctet and
4674                  * postPacket members hold responder values */
4675                 RWREC_CLEAR(record->rev_rec);
4676                 rev_rec = record->rev_rec;
4677                 rwRecSetBytes(
4678                     rev_rec, CLAMP_VAL32(nf9rec->postOctetDeltaCount));
4679                 if (nf9rec->postPacketDeltaCount) {
4680                     rwRecSetPkts(
4681                         rev_rec, CLAMP_VAL32(nf9rec->postPacketDeltaCount));
4682                 } else {
4683                     TRACEMSG(1, ("Setting reverse packets to 1"
4684                                  " for deleted firewall event"));
4685                     rwRecSetPkts(rev_rec, 1);
4686                 }
4687             } else if (nf9rec->postPacketDeltaCount) {
4688                 /* there is a reverse packet count */
4689                 RWREC_CLEAR(record->rev_rec);
4690                 rev_rec = record->rev_rec;
4691                 TRACEMSG(1, ("Setting reverse bytes equal to packets value"
4692                              " for deleted firewall event"));
4693                 rwRecSetBytes(
4694                     rev_rec, CLAMP_VAL32(nf9rec->postPacketDeltaCount));
4695                 rwRecSetPkts(
4696                     rev_rec, CLAMP_VAL32(nf9rec->postPacketDeltaCount));
4697             }
4698             /* else no reverse record */
4699         }
4700     } else if (!(record->bmap & NF9REC_INITIATOR)) {
4701         /* there is no firewall event data and no reverse data; set
4702          * forward data */
4703         if (nf9rec->octetDeltaCount) {
4704             /* use the forward octet count which is non-zero */
4705             if (nf9rec->packetDeltaCount) {
4706                 rwRecSetBytes(fwd_rec, CLAMP_VAL32(nf9rec->octetDeltaCount));
4707                 rwRecSetPkts(fwd_rec, CLAMP_VAL32(nf9rec->packetDeltaCount));
4708             } else if (skpcProbeGetQuirks(probe) & SKPC_QUIRK_ZERO_PACKETS) {
4709                 TRACEMSG(1, ("Setting forward packets to 1"
4710                              " outside of firewall event handler"));
4711                 rwRecSetBytes(fwd_rec, CLAMP_VAL32(nf9rec->octetDeltaCount));
4712                 rwRecSetPkts(fwd_rec, 1);
4713             } else {
4714                 ski_nf9rec_ignore(record, "No forward packets");
4715                 return 0;
4716             }
4717         } else if (nf9rec->postOctetDeltaCount) {
4718             /* postOctet value is non-zero and it is not
4719              * responderOctets; use in place of standard value */
4720             if (nf9rec->postPacketDeltaCount) {
4721                 rwRecSetBytes(fwd_rec,
4722                               CLAMP_VAL32(nf9rec->postOctetDeltaCount));
4723                 rwRecSetPkts(fwd_rec,
4724                              CLAMP_VAL32(nf9rec->postPacketDeltaCount));
4725             } else if (skpcProbeGetQuirks(probe) & SKPC_QUIRK_ZERO_PACKETS) {
4726                 TRACEMSG(1, ("Setting forward packets to 1"
4727                              " outside of firewall event handler"));
4728                 rwRecSetBytes(fwd_rec,
4729                               CLAMP_VAL32(nf9rec->postOctetDeltaCount));
4730                 rwRecSetPkts(fwd_rec, 1);
4731             } else {
4732                 ski_nf9rec_ignore(record, "No forward packets");
4733                 return 0;
4734             }
4735         } else {
4736             ski_nf9rec_ignore(record, "No forward octets");
4737             return 0;
4738         }
4739     } else if (nf9rec->octetDeltaCount) {
4740         /* the template included initiatorOctets & responderOctets and
4741          * there is forward volume */
4742         if (nf9rec->packetDeltaCount) {
4743             rwRecSetBytes(fwd_rec, CLAMP_VAL32(nf9rec->octetDeltaCount));
4744             rwRecSetPkts(fwd_rec, CLAMP_VAL32(nf9rec->packetDeltaCount));
4745         } else if (skpcProbeGetQuirks(probe) & SKPC_QUIRK_ZERO_PACKETS) {
4746             TRACEMSG(1, ("Setting forward packets to 1"
4747                          " outside of firewall event handler"));
4748             rwRecSetBytes(fwd_rec, CLAMP_VAL32(nf9rec->octetDeltaCount));
4749             rwRecSetPkts(fwd_rec, 1);
4750         } else {
4751             ski_nf9rec_ignore(record, "No forward packets");
4752             return 0;
4753         }
4754         if (nf9rec->postOctetDeltaCount) {
4755             /* there is a reverse byte count (responderOctets) */
4756             if (nf9rec->postPacketDeltaCount) {
4757                 RWREC_CLEAR(record->rev_rec);
4758                 rev_rec = record->rev_rec;
4759                 rwRecSetBytes(
4760                     rev_rec, CLAMP_VAL32(nf9rec->postOctetDeltaCount));
4761                 rwRecSetPkts(
4762                     rev_rec, CLAMP_VAL32(nf9rec->postPacketDeltaCount));
4763             } else if (skpcProbeGetQuirks(probe) & SKPC_QUIRK_ZERO_PACKETS) {
4764                 RWREC_CLEAR(record->rev_rec);
4765                 rev_rec = record->rev_rec;
4766                 TRACEMSG(1, ("Setting reverse packets to 1"
4767                              " outside of firewall event handler"));
4768                 rwRecSetBytes(
4769                     rev_rec, CLAMP_VAL32(nf9rec->postOctetDeltaCount));
4770                 rwRecSetPkts(rev_rec, 1);
4771             } else {
4772                 TRACEMSG(
4773                     1, ("Ignoring reverse bytes since no reverse packets"));
4774             }
4775         }
4776     } else if (nf9rec->postOctetDeltaCount) {
4777         /* reverse only record */
4778         ski_nf9rec_ignore(record,
4779                           "No forward octets (reverse octets are non-zero)");
4780         return 0;
4781     } else {
4782         ski_nf9rec_ignore(record, "No forward/reverse octets");
4783         return 0;
4784     }
4785 
4786     TRACEMSG(1, ("Read a %s nf9rec record", ((rev_rec)?"bi-flow":"forward")));
4787 
4788     /* Handle the IP addresses */
4789 #if SK_ENABLE_IPV6
4790     if (record->bmap & NF9REC_IP6) {
4791         rwRecSetIPv6(fwd_rec);
4792         rwRecMemSetSIPv6(fwd_rec, nf9rec->addr.ip6.sourceIPv6Address);
4793         rwRecMemSetDIPv6(fwd_rec, nf9rec->addr.ip6.destinationIPv6Address);
4794         rwRecMemSetNhIPv6(fwd_rec, nf9rec->addr.ip6.ipNextHopIPv6Address);
4795         if (rev_rec) {
4796             rwRecSetIPv6(rev_rec);
4797             rwRecMemSetSIPv6(rev_rec, &nf9rec->addr.ip6.destinationIPv6Address);
4798             rwRecMemSetDIPv6(rev_rec, &nf9rec->addr.ip6.sourceIPv6Address);
4799             rwRecMemSetNhIPv6(rev_rec, &nf9rec->addr.ip6.ipNextHopIPv6Address);
4800         }
4801     } else
4802 #endif /* SK_ENABLE_IPV6 */
4803     {
4804         /* Take values from IPv4 */
4805         rwRecSetSIPv4(fwd_rec, nf9rec->addr.ip4.sourceIPv4Address);
4806         rwRecSetDIPv4(fwd_rec, nf9rec->addr.ip4.destinationIPv4Address);
4807         rwRecSetNhIPv4(fwd_rec, nf9rec->addr.ip4.ipNextHopIPv4Address);
4808         if (rev_rec) {
4809             rwRecSetSIPv4(rev_rec, nf9rec->addr.ip4.destinationIPv4Address);
4810             rwRecSetDIPv4(rev_rec, nf9rec->addr.ip4.sourceIPv4Address);
4811             rwRecSetNhIPv4(rev_rec, nf9rec->addr.ip4.ipNextHopIPv4Address);
4812         }
4813     }
4814 
4815     /* Time stamp */
4816     if (record->bmap & NF9REC_MILLI) {
4817         if (0 == nf9rec->t.milli.flowStartMilliseconds) {
4818             rwRecSetStartTime(
4819                 fwd_rec, (sktime_t)nf9rec->t.milli.flowEndMilliseconds);
4820             rwRecSetElapsed(fwd_rec, 0);
4821         } else {
4822             int64_t dur = (nf9rec->t.milli.flowEndMilliseconds
4823                            - nf9rec->t.milli.flowStartMilliseconds);
4824             rwRecSetStartTime(
4825                 fwd_rec, (sktime_t)nf9rec->t.milli.flowStartMilliseconds);
4826             if (dur < 0) {
4827                 rwRecSetElapsed(fwd_rec, 0);
4828             } else if (dur > (int64_t)UINT32_MAX) {
4829                 rwRecSetElapsed(fwd_rec, UINT32_MAX);
4830             } else {
4831                 rwRecSetElapsed(fwd_rec, (uint32_t)dur);
4832             }
4833         }
4834         if (skpcProbeGetLogFlags(probe) & SOURCE_LOG_TIMESTAMPS) {
4835             sktimestamp_r(stime_buf, rwRecGetStartTime(fwd_rec),
4836                           SKTIMESTAMP_UTC);
4837             INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
4838                      " flowStartMilliseconds=%" PRIu64
4839                      ", flowEndMilliseconds=%" PRIu64),
4840                     skpcProbeGetName(probe), stime_buf,
4841                     (double)rwRecGetElapsed(fwd_rec)/1000,
4842                     nf9rec->t.milli.flowStartMilliseconds,
4843                     nf9rec->t.milli.flowEndMilliseconds);
4844         }
4845     } else {
4846         /* Times based on flow generator system uptimes (Netflow v9) */
4847         intmax_t uptime, difference;
4848         sktime_t export_msec;
4849         const char *rollover_first;
4850         const char *rollover_last = "";
4851 
4852         assert(record->bmap & NF9REC_SYSUP);
4853 
4854         /* Compute the uptime: systemInitTimeMilliseconds is the
4855          * absolute router boot time (msec), and libfixbuf sets it by
4856          * subtracting the NFv9 uptime (msec) from the record's
4857          * absolute export time (sec). */
4858         export_msec = sktimeCreate(fBufGetExportTime(fbuf), 0);
4859         uptime = export_msec - nf9rec->t.sysup.systemInitTimeMilliseconds;
4860         if (skpcProbeGetQuirks(probe) & SKPC_QUIRK_NF9_SYSUPTIME_SECS) {
4861             /* uptime was reported in seconds, not msec */
4862             TRACEMSG(2, (("Before adjustment: exportTimeMillisec %" PRIu64
4863                           ", initTimeMillisec %" PRIu64 ", uptime %" PRIdMAX
4864                           ", startUpTime %" PRIu32 ", endUpTime %" PRIu32
4865                           ", packets %" PRIu32),
4866                          export_msec,
4867                          nf9rec->t.sysup.systemInitTimeMilliseconds,
4868                          uptime, nf9rec->t.sysup.flowStartSysUpTime,
4869                          nf9rec->t.sysup.flowEndSysUpTime,
4870                          rwRecGetPkts(fwd_rec)));
4871             uptime *= 1000;
4872             nf9rec->t.sysup.systemInitTimeMilliseconds = export_msec - uptime;
4873             if (rwRecGetPkts(fwd_rec) == 1
4874                 && (nf9rec->t.sysup.flowEndSysUpTime
4875                     < nf9rec->t.sysup.flowStartSysUpTime))
4876             {
4877                 /* sometimes the end time for single packet flows is
4878                  * very different than the start time. */
4879                 nf9rec->t.sysup.flowEndSysUpTime
4880                     = nf9rec->t.sysup.flowStartSysUpTime;
4881             }
4882         }
4883 
4884         /* Compute duration */
4885         if (nf9rec->t.sysup.flowStartSysUpTime
4886             <= nf9rec->t.sysup.flowEndSysUpTime)
4887         {
4888             rwRecSetElapsed(fwd_rec, (nf9rec->t.sysup.flowEndSysUpTime
4889                                       - nf9rec->t.sysup.flowStartSysUpTime));
4890         } else {
4891             /* assume EndTime rolled-over and start did not */
4892             rwRecSetElapsed(
4893                 fwd_rec, (ROLLOVER32 + nf9rec->t.sysup.flowEndSysUpTime
4894                           - nf9rec->t.sysup.flowStartSysUpTime));
4895             rollover_last = ", assume flowEndSysUpTime rollover";
4896         }
4897         /* Compute uptime, checking for rollover */
4898         difference = uptime - nf9rec->t.sysup.flowStartSysUpTime;
4899         if (difference > MAXIMUM_FLOW_TIME_DEVIATION) {
4900             /* assume upTime is set before record is composed and
4901              * that start-time has rolled over. */
4902             rwRecSetStartTime(
4903                 fwd_rec, (nf9rec->t.sysup.systemInitTimeMilliseconds
4904                           + nf9rec->t.sysup.flowStartSysUpTime
4905                           + ROLLOVER32));
4906             rollover_first = ", assume flowStartSysUpTime rollover";
4907         } else if (-difference > MAXIMUM_FLOW_TIME_DEVIATION) {
4908             /* assume upTime is set after record is composed and
4909              * that upTime has rolled over. */
4910             rwRecSetStartTime(
4911                 fwd_rec, (nf9rec->t.sysup.systemInitTimeMilliseconds
4912                           + nf9rec->t.sysup.flowStartSysUpTime
4913                           - ROLLOVER32));
4914             rollover_first = ", assume sysUpTime rollover";
4915         } else {
4916             /* times look reasonable; assume no roll over */
4917             rwRecSetStartTime(
4918                 fwd_rec, (nf9rec->t.sysup.systemInitTimeMilliseconds
4919                           + nf9rec->t.sysup.flowStartSysUpTime));
4920             rollover_first = "";
4921         }
4922         if (skpcProbeGetLogFlags(probe) & SOURCE_LOG_TIMESTAMPS) {
4923             sktimestamp_r(
4924                 stime_buf, rwRecGetStartTime(fwd_rec), SKTIMESTAMP_UTC);
4925             INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
4926                      " flowStartSysUpTime=%" PRIu32
4927                      ", flowEndSysUpTime=%" PRIu32
4928                      ", systemInitTimeMilliseconds=%" PRIu64
4929                      ", exportTimeSeconds=%" PRIu32
4930                      ", calculated sysUpTime=%" PRIdMAX "%s%s"),
4931                     skpcProbeGetName(probe),
4932                     stime_buf, (double)rwRecGetElapsed(fwd_rec)/1000,
4933                     nf9rec->t.sysup.flowStartSysUpTime,
4934                     nf9rec->t.sysup.flowEndSysUpTime,
4935                     nf9rec->t.sysup.systemInitTimeMilliseconds,
4936                     fBufGetExportTime(fbuf), uptime,
4937                     rollover_first, rollover_last);
4938         }
4939     }
4940 
4941     /* SNMP or VLAN interfaces */
4942     if (SKPC_IFVALUE_SNMP == skpcProbeGetInterfaceValueType(probe)) {
4943         rwRecSetInput(fwd_rec, CLAMP_VAL16(nf9rec->ingressInterface));
4944         rwRecSetOutput(fwd_rec, CLAMP_VAL16(nf9rec->egressInterface));
4945     } else {
4946         rwRecSetInput(fwd_rec, nf9rec->vlanId);
4947         rwRecSetOutput(fwd_rec, nf9rec->postVlanId);
4948     }
4949 
4950     /* Check for active timeout flag in the flowEndReason */
4951     if ((nf9rec->flowEndReason & SKI_END_MASK) == SKI_END_ACTIVE) {
4952         rwRecSetTcpState(fwd_rec, SK_TCPSTATE_TIMEOUT_KILLED);
4953     }
4954 
4955     rwRecSetProto(fwd_rec, nf9rec->protocolIdentifier);
4956 
4957     /* For TCP Flags, use whatever value was given in tcpControlBits,
4958      * regardless of protocol */
4959     rwRecSetFlags(fwd_rec, nf9rec->tcpControlBits);
4960     if (!rwRecIsICMP(fwd_rec)) {
4961         /* Use whatever values are in sport and dport, regardless of
4962          * protocol */
4963         rwRecSetSPort(fwd_rec, nf9rec->sourceTransportPort);
4964         rwRecSetDPort(fwd_rec, nf9rec->destinationTransportPort);
4965         if (rev_rec) {
4966             rwRecSetSPort(rev_rec, nf9rec->destinationTransportPort);
4967             rwRecSetDPort(rev_rec, nf9rec->sourceTransportPort);
4968         }
4969     } else {
4970         /* ICMP Record */
4971         /* Store ((icmpType << 8) | icmpCode) in the dPort if
4972          * available; else use the dport */
4973         rwRecSetSPort(fwd_rec, 0);
4974         if (record->bmap & TMPL_BIT_icmpTypeCodeIPv4) {
4975             rwRecSetDPort(fwd_rec, nf9rec->icmpTypeCode);
4976         } else if (record->bmap & TMPL_BIT_icmpTypeIPv4) {
4977             rwRecSetDPort(fwd_rec,((nf9rec->icmpType << 8)| nf9rec->icmpCode));
4978         } else {
4979             rwRecSetDPort(fwd_rec, nf9rec->destinationTransportPort);
4980         }
4981         if (rev_rec) {
4982             rwRecSetSPort(rev_rec, 0);
4983             rwRecSetDPort(rev_rec, rwRecGetDPort(fwd_rec));
4984         }
4985     }
4986 
4987     if (rev_rec) {
4988         rwRecSetStartTime(rev_rec, rwRecGetStartTime(fwd_rec));
4989         rwRecSetElapsed(rev_rec, rwRecGetElapsed(fwd_rec));
4990         rwRecSetInput(rev_rec, rwRecGetOutput(fwd_rec));
4991         rwRecSetOutput(rev_rec, rwRecGetInput(fwd_rec));
4992         rwRecSetTcpState(rev_rec, rwRecGetTcpState(fwd_rec));
4993         rwRecSetProto(fwd_rec, nf9rec->protocolIdentifier);
4994     }
4995 
4996     /* all done */
4997     return ((rev_rec) ? 2 : 1);
4998 }
4999 
5000 
5001 /*
5002  *    Helper function for ipfix_reader().
5003  *
5004  *    Handle the result of converting an IPFIX record to SiLK Flow
5005  *    records on 'source': update statistics, store the reverse record
5006  *    (if any) into the circular buffer, and move to the next location
5007  *    in the circular buffer.  The expected values for 'read_result'
5008  *    are 0 (record ignored), 1 (uni-flow), and 2 (bi-flow).
5009  */
5010 static void
ipfix_reader_update_circbuf(skIPFIXSource_t * source,int read_result)5011 ipfix_reader_update_circbuf(
5012     skIPFIXSource_t    *source,
5013     int                 read_result)
5014 {
5015 #if !SOURCE_LOG_MAX_PENDING_WRITE
5016 #define circbuf_count_addr  NULL
5017 #else
5018 #define circbuf_count_addr  &circbuf_count
5019     uint32_t circbuf_count;
5020 #endif
5021 
5022     switch (read_result) {
5023       case 0:
5024         /* Ignore record */
5025         pthread_mutex_lock(&source->stats_mutex);
5026         ++source->ignored_flows;
5027         pthread_mutex_unlock(&source->stats_mutex);
5028         break;
5029 
5030       case 1:
5031         /* We have filled the empty source->current_record slot.
5032          * Advance to the next record location.  */
5033         if (skCircBufGetWriterBlock(
5034                 source->circbuf, &source->current_record, circbuf_count_addr))
5035         {
5036             assert(source->stopped);
5037             break;
5038         }
5039         pthread_mutex_lock(&source->stats_mutex);
5040         ++source->forward_flows;
5041 #if SOURCE_LOG_MAX_PENDING_WRITE
5042         if (circbuf_count > source->max_pending) {
5043             source->max_pending = circbuf_count;
5044         }
5045 #endif
5046         pthread_mutex_unlock(&source->stats_mutex);
5047         break;
5048 
5049       case 2:
5050         /* copy reverse record into the circular buf */
5051         if (skCircBufGetWriterBlock(
5052                 source->circbuf, &source->current_record, NULL))
5053         {
5054             assert(source->stopped);
5055             break;
5056         }
5057         memcpy(source->current_record, &source->rvbuf, sizeof(source->rvbuf));
5058         if (skCircBufGetWriterBlock(
5059                 source->circbuf, &source->current_record, circbuf_count_addr))
5060         {
5061             assert(source->stopped);
5062             break;
5063         }
5064         pthread_mutex_lock(&source->stats_mutex);
5065         ++source->forward_flows;
5066         ++source->reverse_flows;
5067 #if SOURCE_LOG_MAX_PENDING_WRITE
5068         if (circbuf_count > source->max_pending) {
5069             source->max_pending = circbuf_count;
5070         }
5071 #endif
5072         pthread_mutex_unlock(&source->stats_mutex);
5073         break;
5074 
5075       default:
5076         skAbortBadCase(read_result);
5077     }
5078 }
5079 
5080 
5081 /*
5082  *    THREAD ENTRY POINT
5083  *
5084  *    The ipfix_reader() function is the main thread for listening to
5085  *    data from a single fbListener_t object.  It is passed the
5086  *    skIPFIXSourceBase_t object containing that fbListener_t object.
5087  *    This thread is started from the ipfixSourceCreateFromSockaddr()
5088  *    function.
5089  */
5090 void *
ipfix_reader(void * vsource_base)5091 ipfix_reader(
5092     void               *vsource_base)
5093 {
5094 #define IS_UDP (base->connspec->transport == FB_UDP)
5095     skIPFIXSourceBase_t *base = (skIPFIXSourceBase_t *)vsource_base;
5096     skIPFIXConnection_t *conn = NULL;
5097     skIPFIXSource_t *source = NULL;
5098     GError *err = NULL;
5099     fBuf_t *fbuf = NULL;
5100     int rv;
5101 
5102     TRACE_ENTRY;
5103 
5104     /* Ignore all signals */
5105     skthread_ignore_signals();
5106 
5107     /* Communicate that the thread has started */
5108     pthread_mutex_lock(&base->mutex);
5109     pthread_cond_signal(&base->cond);
5110     base->started = 1;
5111     base->running = 1;
5112     DEBUGMSG("fixbuf listener started for [%s]:%s",
5113              base->connspec->host ? base->connspec->host : "*",
5114              base->connspec->svc);
5115     pthread_mutex_unlock(&base->mutex);
5116 
5117     TRACEMSG(3, ("base %p started for [%s]:%s",
5118                  base, base->connspec->host ? base->connspec->host : "*",
5119                  base->connspec->svc));
5120 
5121     /* Loop until destruction of the base object */
5122     while (!base->destroyed) {
5123 
5124         /* wait for a new connection */
5125         fbuf = fbListenerWait(base->listener, &err);
5126         if (NULL == fbuf) {
5127             if (NULL == err) {
5128                 /* got an unknown error---treat as fatal */
5129                 NOTICEMSG("fixbuf listener shutting down:"
5130                           " unknown error from fbListenerWait");
5131                 break;
5132             }
5133 
5134             if (g_error_matches(err,SK_IPFIXSOURCE_DOMAIN,SK_IPFIX_ERROR_CONN))
5135             {
5136                 /* the callback rejected the connection (TCP only) */
5137                 DEBUGMSG("fixbuf listener rejected connection: %s",
5138                          err->message);
5139                 g_clear_error(&err);
5140                 continue;
5141             }
5142 
5143             /* FB_ERROR_NLREAD indicates interrupted read, either
5144              * because the socket received EINTR or because
5145              * fbListenerInterrupt() was called.
5146              *
5147              * FB_ERROR_EOM indicates an end-of-message, and needs to
5148              * be ignored when running in manual mode. */
5149             if (g_error_matches(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD)
5150                 || g_error_matches(err, FB_ERROR_DOMAIN, FB_ERROR_EOM))
5151             {
5152                 TRACEMSG(1, (("fixbuf listener received %s"
5153                               " while waiting for a connection: %s"),
5154                              ((FB_ERROR_EOM == err->code)
5155                               ? "end-of-message" : "interrupted read"),
5156                              err->message));
5157                 g_clear_error(&err);
5158                 continue;
5159             }
5160 
5161             /* treat any other error as fatal */
5162             NOTICEMSG(("fixbuf listener shutting down: %s"
5163                        " (d=%" PRIu32 ",c=%" PRId32 ")"),
5164                       err->message, (uint32_t)err->domain, (int32_t)err->code);
5165             g_clear_error(&err);
5166             break;
5167         }
5168 
5169         /* Make sure the fbuf is in manual mode.  Manual mode is
5170          * required to multiplex among multiple collectors using
5171          * fbListenerWait().  Without this, fBufNext() blocks once the
5172          * buffer is empty until it has messages again.  Instead, we
5173          * want to switch to a different fbuf once we read all records
5174          * in the current buffer. */
5175         fBufSetAutomaticMode(fbuf, 0);
5176 
5177 #if 0
5178         /* Added #if 0 since this should not be needed; the callback
5179          * is added to the session when the session is allocated. */
5180 
5181         /* Invoke a callback when a new template arrives that tells
5182          * fixbuf how to map from the subTemplateMultiList used by YAF
5183          * for TCP information to our internal strucure. */
5184         skiAddSessionCallback(fBufGetSession(fbuf));
5185 #endif  /* 0 */
5186 
5187         /* Loop over fBufNext() until the buffer empties, we begin to
5188          * shutdown, or there is an error.  All the ski_*_next()
5189          * functions call fBufNext() internally. */
5190         conn = NULL;
5191         while (!base->destroyed) {
5192             ski_rectype_t rectype;
5193             ski_record_t record;
5194 
5195             /* Determine what type of record is next; this calls
5196              * fBufNextCollectionTemplate(), and gives error at end of
5197              * message */
5198             rectype = ski_rectype_next(fbuf, &record, &err);
5199 
5200             if (!conn) {
5201                 /* Get the connection data associated with this fBuf_t
5202                  * object.  In manual mode this loop processes a
5203                  * single msg, which must have a single source. */
5204                 conn = ((skIPFIXConnection_t *)
5205                         fbCollectorGetContext(fBufGetCollector(fbuf)));
5206                 if (conn == NULL) {
5207                     /* If conn is NULL, we must have rejected a UDP
5208                      * connection from the appInit function. */
5209                     assert(rectype == SKI_RECTYPE_ERROR);
5210                     TRACEMSG(2, ("<UNKNOWN>: %s", ski_rectype_name[rectype]));
5211                     break;
5212                 }
5213                 source = conn->source;
5214                 assert(source != NULL);
5215 
5216                 TRACEMSG(5, ("'%s': conn = %p, source = %p, fbuf = %p",
5217                              source->name, conn, source, fbuf));
5218 
5219                 /* If this source is stopped, end the connection. If
5220                  * source is told to stop while processing msg, the
5221                  * circbuf will inform us. */
5222                 if (source->stopped) {
5223                     TRACEMSG(1, (("'%s': Closing connection since"
5224                                   " source is stopping"), source->name));
5225                     if (!IS_UDP) {
5226                         fBufFree(fbuf);
5227                         fbuf = NULL;
5228                     }
5229                     if (rectype == SKI_RECTYPE_ERROR) {
5230                         g_clear_error(&err);
5231                     }
5232                     break;
5233                 }
5234             }
5235 
5236             /* There is a 'break' statement after this switch(), so
5237              * any "normal" event (no error condition and buffer is
5238              * not empty) must use 'continue' after processing to
5239              * continue the loop.  Any 'break' indicates an error. */
5240             switch (rectype) {
5241               case SKI_RECTYPE_ERROR:
5242                 TRACEMSG(2, ("'%s': %s",
5243                              source->name, ski_rectype_name[rectype]));
5244                 break;          /* error */
5245 
5246               case SKI_RECTYPE_IGNORE:
5247                 /* An unknown/ignored template */
5248                 if (!ski_ignore_next(fbuf, &record, source->probe, &err)){
5249                     /* should have been able to read something */
5250                     TRACEMSG(2, ("'%s': %s and ski_ignore_next() is FALSE",
5251                                  source->name, ski_rectype_name[rectype]));
5252                     break;      /* error */
5253                 }
5254                 continue;
5255 
5256               case SKI_RECTYPE_YAFSTATS:
5257                 if (!ski_yafstats_next(fbuf, &record, source->probe, &err)){
5258                     /* should have been able to read the stats */
5259                     TRACEMSG(2, ("'%s': %s and ski_yafstats_next() is FALSE",
5260                                  source->name, ski_rectype_name[rectype]));
5261                     break;      /* error */
5262                 }
5263                 ski_yafstats_update_source(
5264                     source, &record, &conn->prev_yafstats);
5265                 continue;
5266 
5267               case SKI_RECTYPE_TOMBSTONE:
5268                 if (!ski_tombstone_next(fbuf, &record, source->probe, &err)){
5269                     TRACEMSG(2, ("'%s': %s and ski_tombstone_next() is FALSE",
5270                                  source->name, ski_rectype_name[rectype]));
5271                     break;      /* error */
5272                 }
5273                 continue;
5274 
5275               case SKI_RECTYPE_NF9SAMPLING:
5276                 if (!ski_nf9sampling_next(fbuf, &record, source->probe, &err)){
5277                     /* should have been able to read something */
5278                     TRACEMSG(2, ("'%s': %s and ski_nf9sampling_next() is FALSE",
5279                                  source->name, ski_rectype_name[rectype]));
5280                     break;      /* error */
5281                 }
5282                 continue;
5283 
5284               case SKI_RECTYPE_FIXREC:
5285                 assert(source->current_record);
5286                 record.fwd_rec = source->current_record;
5287                 record.rev_rec = &source->rvbuf;
5288                 rv = ski_fixrec_next(fbuf, &record, source->probe, &err);
5289                 if (-1 == rv) {
5290                     TRACEMSG(2, ("'%s': %s and ski_fixrec_next() returned -1",
5291                                  source->name, ski_rectype_name[rectype]));
5292                     break;      /* error */
5293                 }
5294                 ipfix_reader_update_circbuf(source, rv);
5295                 continue;
5296 
5297               case SKI_RECTYPE_YAFREC:
5298                 assert(source->current_record);
5299                 record.fwd_rec = source->current_record;
5300                 record.rev_rec = &source->rvbuf;
5301                 rv = ski_yafrec_next(fbuf, &record, source->probe, &err);
5302                 if (-1 == rv) {
5303                     TRACEMSG(2, ("'%s': %s and ski_yafrec_next() returned -1",
5304                                  source->name, ski_rectype_name[rectype]));
5305                     break;      /* error */
5306                 }
5307                 ipfix_reader_update_circbuf(source, rv);
5308                 continue;
5309 
5310               case SKI_RECTYPE_NF9REC:
5311                 assert(source->current_record);
5312                 record.fwd_rec = source->current_record;
5313                 record.rev_rec = &source->rvbuf;
5314                 rv = ski_nf9rec_next(fbuf, &record, source->probe, &err);
5315                 if (-1 == rv) {
5316                     TRACEMSG(2, ("'%s': %s and ski_nf9rec_next() returned -1",
5317                                  source->name, ski_rectype_name[rectype]));
5318                     break;      /* error */
5319                 }
5320                 ipfix_reader_update_circbuf(source, rv);
5321                 continue;
5322             } /* switch (rectype) */
5323 
5324             /* If we get here, stop reading from the current fbuf.
5325              * This may be because the fbuf is empty, because we are
5326              * shutting down, or due to an error. */
5327             break;
5328         }
5329         /* Finished with current IPFIX message, encountered an error
5330          * while processing message, or we are shutting down */
5331 
5332         /* Handle FB_ERROR_NLREAD and FB_ERROR_EOM returned by
5333          * fBufNext() in the same way as when they are returned by
5334          * fbListenerWait().
5335          *
5336          * FB_ERROR_NLREAD is also returned when a previously rejected
5337          * UDP client attempts to send more data. */
5338         if (g_error_matches(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD)
5339             || g_error_matches(err, FB_ERROR_DOMAIN, FB_ERROR_EOM))
5340         {
5341             TRACEMSG(1, ("'%s': Ignoring %s: %s",
5342                          (conn ? source->name : "<UNKNOWN>"),
5343                          ((FB_ERROR_EOM == err->code)
5344                           ? "end-of-message" : "interrupted read"),
5345                          err->message));
5346             /* Do not free the fbuf here.  The fbuf is owned by the
5347              * listener, and will be freed when the listener is freed.
5348              * Calling fBufFree() here would cause fixbuf to forget
5349              * the current template, which would cause it to ignore
5350              * records until a new template is transmitted. */
5351             g_clear_error(&err);
5352             continue;
5353         }
5354 
5355         /* SK_IPFIX_ERROR_CONN indicates that a new UDP "connection"
5356          * was rejected by the appInit function in a multi-UDP
5357          * libfixbuf session.  Do not free the fbuf since we do not
5358          * have a connection yet; wait for another connection. */
5359         if (g_error_matches(err, SK_IPFIXSOURCE_DOMAIN, SK_IPFIX_ERROR_CONN)) {
5360             assert(IS_UDP);
5361             INFOMSG("Closing connection: %s", err->message);
5362             g_clear_error(&err);
5363             continue;
5364         }
5365 
5366         /* Handle shutdown events */
5367         if (base->destroyed) {
5368             break;
5369         }
5370 
5371         /* Source has stopped, loop for the next source. */
5372         if (conn && source->stopped) {
5373             continue;
5374         }
5375 
5376         /* The remainder of the code in this while() block assumes
5377          * that 'source' is valid, which is only true if 'conn' is
5378          * non-NULL.  Trap that here, just in case. */
5379         if (NULL == conn) {
5380             if (NULL == err) {
5381                 /* give up when error code is unknown */
5382                 NOTICEMSG("'<UNKNOWN>': fixbuf listener shutting down:"
5383                           " unknown error from fBufNext");
5384                 break;
5385             }
5386             DEBUGMSG("Ignoring packet: %s (d=%" PRIu32 ",c=%" PRId32 ")",
5387                      err->message, (uint32_t)err->domain, (int32_t)err->code);
5388             g_clear_error(&err);
5389             continue;
5390         }
5391 
5392         /* FB_ERROR_NETFLOWV9 indicates an anomalous netflow v9
5393          * record; these do not disturb fixbuf state, and so should be
5394          * ignored. */
5395         if (g_error_matches(err, FB_ERROR_DOMAIN, FB_ERROR_NETFLOWV9)) {
5396             DEBUGMSG("'%s': Ignoring NetFlowV9 record: %s",
5397                      source->name, err->message);
5398             g_clear_error(&err);
5399             continue;
5400         }
5401 
5402         /* FB_ERROR_SFLOW indicates an anomalous sFlow
5403          * record; these do not disturb fixbuf state, and so should be
5404          * ignored. */
5405         if (g_error_matches(err, FB_ERROR_DOMAIN, FB_ERROR_SFLOW)) {
5406             DEBUGMSG("'%s': Ignoring sFlow record: %s",
5407                      source->name, err->message);
5408             g_clear_error(&err);
5409             continue;
5410         }
5411 
5412         /* FB_ERROR_TMPL indicates a set references a template ID for
5413          * which there is no template.  Log and continue. */
5414         if (g_error_matches(err, FB_ERROR_DOMAIN, FB_ERROR_TMPL)) {
5415             DEBUGMSG("'%s': Ignoring data set: %s",
5416                      source->name, err->message);
5417             g_clear_error(&err);
5418             continue;
5419         }
5420 
5421         /* FB_ERROR_IPFIX indicates invalid IPFIX.  We could simply
5422          * choose to log and continue; instead we choose to log, close
5423          * the connection, and continue. */
5424         if (g_error_matches(err, FB_ERROR_DOMAIN, FB_ERROR_IPFIX)) {
5425             if (IS_UDP) {
5426                 DEBUGMSG("'%s': Ignoring invalid IPFIX: %s",
5427                          source->name, err->message);
5428             } else {
5429                 INFOMSG("'%s': Closing connection; received invalid IPFIX: %s",
5430                         source->name, err->message);
5431                 fBufFree(fbuf);
5432                 fbuf = NULL;
5433             }
5434             g_clear_error(&err);
5435             continue;
5436         }
5437 
5438         /* FB_ERROR_EOF indicates that the connection associated with
5439          * this fBuf_t object has finished.  In this case, free the
5440          * fBuf_t object to close the connection.  Do not free the
5441          * fBuf_t for UDP connections, since these UDP-based fBuf_t
5442          * objects are freed with the listener. */
5443         if (g_error_matches(err, FB_ERROR_DOMAIN, FB_ERROR_EOF)) {
5444             if (!IS_UDP) {
5445                 INFOMSG("'%s': Closing connection: %s",
5446                         source->name, err->message);
5447                 fBufFree(fbuf);
5448                 fbuf = NULL;
5449             }
5450             g_clear_error(&err);
5451             continue;
5452         }
5453 
5454         /* Handle an unexpected error generated by fixbuf */
5455         if (err && err->domain == FB_ERROR_DOMAIN) {
5456             if (IS_UDP) {
5457                 DEBUGMSG(("'%s': Ignoring UDP packet: %s"
5458                           " (d=%" PRIu32 ",c=%" PRId32 ")"),
5459                          source->name, err->message,
5460                          (uint32_t)err->domain, (int32_t)err->code);
5461             } else {
5462                 INFOMSG(("'%s': Closing connection: %s"
5463                          " (d=%" PRIu32 ",c=%" PRId32 ")"),
5464                         source->name, err->message,
5465                         (uint32_t)err->domain, (int32_t)err->code);
5466                 fBufFree(fbuf);
5467                 fbuf = NULL;
5468             }
5469             g_clear_error(&err);
5470             continue;
5471         }
5472 
5473         /* In the event of an unhandled error, end the thread. */
5474         if (NULL == err) {
5475             NOTICEMSG(("'%s': fixbuf listener shutting down:"
5476                        " unknown error from fBufNext"),
5477                       source->name);
5478         } else {
5479             NOTICEMSG(("'%s': fixbuf listener shutting down: %s"
5480                        " (d=%" PRIu32 ",c=%" PRId32 ")"),
5481                       source->name, err->message,
5482                       (uint32_t)err->domain, (int32_t)err->code);
5483             g_clear_error(&err);
5484         }
5485         break;
5486     }
5487 
5488     TRACEMSG(3, ("base %p exited while() loop", base));
5489 
5490     /* Free the fbuf if it exists.  (If it's UDP, it will be freed by
5491      * the destruction of the listener below.) */
5492     if (fbuf && !IS_UDP) {
5493         TRACEMSG(3, ("base %p calling fBufFree", base));
5494         fBufFree(fbuf);
5495     }
5496 
5497     /* Note that the thread is ending, and wait for
5498      * skIPFIXSourceDestroy() to mark this as destroyed */
5499     DEBUGMSG("fixbuf listener ending for [%s]:%s...",
5500              base->connspec->host ? base->connspec->host : "*",
5501              base->connspec->svc);
5502     pthread_mutex_lock(&base->mutex);
5503     while (!base->destroyed) {
5504         pthread_cond_wait(&base->cond, &base->mutex);
5505     }
5506 
5507     TRACEMSG(3, ("base %p is set to destroyed", base));
5508 
5509     /* Destroy the fbListener_t object.  This destroys the fbuf if the
5510      * stream is UDP. */
5511     ipfixSourceBaseFreeListener(base);
5512 
5513     /* Notify skIPFIXSourceDestroy() that the thread is ending */
5514     base->running = 0;
5515     pthread_cond_signal(&base->cond);
5516     DEBUGMSG("fixbuf listener ended for [%s]:%s.",
5517              base->connspec->host ? base->connspec->host : "*",
5518              base->connspec->svc);
5519 
5520     pthread_mutex_unlock(&base->mutex);
5521 
5522     TRACE_RETURN(NULL);
5523 #undef IS_UDP
5524 }
5525 
5526 
5527 /*
5528  *    Requests a record from the file-based IPFIX source 'source'.
5529  *
5530  *    Returns 0 on success, -1 on failure.
5531  */
5532 int
ipfixSourceGetRecordFromFile(skIPFIXSource_t * source,rwRec * ipfix_rec)5533 ipfixSourceGetRecordFromFile(
5534     skIPFIXSource_t        *source,
5535     rwRec                  *ipfix_rec)
5536 {
5537     ski_record_t record;
5538     GError *err = NULL;
5539     int rv;
5540 
5541     TRACE_ENTRY;
5542 
5543     /* Reading from a file */
5544     pthread_mutex_lock(&source->base->mutex);
5545     assert(source->readbuf);
5546 
5547     if (source->reverse) {
5548         /* A reverse record exists from the previous flow */
5549         memcpy(ipfix_rec, &source->rvbuf, sizeof(*ipfix_rec));
5550         ++source->reverse_flows;
5551         source->reverse = 0;
5552     } else {
5553         /* Initialize the control variable for the do{}while() loop.
5554          * 0: ignore; 1: uniflow; 2: biflow; -1: error */
5555         rv = 0;
5556         do {
5557             /* Similar to the switch() block in ipfix_reader() above */
5558             switch (ski_rectype_next(source->readbuf, &record, &err)) {
5559               case SKI_RECTYPE_ERROR:
5560                 rv = -1;
5561                 break;          /* error */
5562 
5563               case SKI_RECTYPE_NF9SAMPLING:
5564               case SKI_RECTYPE_IGNORE:
5565                 if (!ski_ignore_next(
5566                         source->readbuf, &record, source->probe, &err))
5567                 {
5568                     /* should have been able to read something */
5569                     TRACEMSG(2, ("'%s': %s and ski_ignore_next() is FALSE",
5570                                  source->name,
5571                                  ski_rectype_name[record.rectype]));
5572                     rv = -1;
5573                     break;      /* error */
5574                 }
5575                 continue;
5576 
5577               case SKI_RECTYPE_YAFSTATS:
5578                 if (!ski_yafstats_next(
5579                         source->readbuf, &record, source->probe, &err))
5580                 {
5581                     /* should have been able to read the stats */
5582                     TRACEMSG(2, ("'%s': %s and ski_yafstats_next() is FALSE",
5583                                  source->name,
5584                                  ski_rectype_name[record.rectype]));
5585                     rv = -1;
5586                     break;      /* error */
5587                 }
5588                 ski_yafstats_update_source(
5589                     source, &record, &source->prev_yafstats);
5590                 continue;
5591 
5592               case SKI_RECTYPE_TOMBSTONE:
5593                 if (!ski_tombstone_next(
5594                         source->readbuf, &record, source->probe, &err))
5595                 {
5596                     TRACEMSG(2, ("'%s': %s and ski_tombstone_next() is FALSE",
5597                                  source->name,
5598                                  ski_rectype_name[record.rectype]));
5599                     rv = -1;
5600                     break;      /* error */
5601                 }
5602                 continue;
5603 
5604               case SKI_RECTYPE_FIXREC:
5605                 record.fwd_rec = ipfix_rec;
5606                 record.rev_rec = &source->rvbuf;
5607                 rv = ski_fixrec_next(source->readbuf, &record,
5608                                      source->probe, &err);
5609                 if (rv == 0) {
5610                     ++source->ignored_flows;
5611                 }
5612                 break;
5613 
5614               case SKI_RECTYPE_YAFREC:
5615                 record.fwd_rec = ipfix_rec;
5616                 record.rev_rec = &source->rvbuf;
5617                 rv = ski_yafrec_next(source->readbuf, &record,
5618                                      source->probe, &err);
5619                 if (rv == 0) {
5620                     ++source->ignored_flows;
5621                 }
5622                 break;
5623 
5624               case SKI_RECTYPE_NF9REC:
5625                 record.fwd_rec = ipfix_rec;
5626                 record.rev_rec = &source->rvbuf;
5627                 rv = ski_nf9rec_next(source->readbuf, &record,
5628                                      source->probe, &err);
5629                 if (rv == 0) {
5630                     ++source->ignored_flows;
5631                 }
5632                 break;
5633 
5634               default:
5635                 skAppPrintErr("Unexpected record type");
5636                 skAbort();
5637             }
5638         } while (rv == 0);  /* Continue while current record is ignored */
5639 
5640         if (rv == -1) {
5641             /* End of file or other problem */
5642             g_clear_error(&err);
5643             pthread_mutex_unlock(&source->base->mutex);
5644             TRACE_RETURN(-1);
5645         }
5646 
5647         assert(rv == 1 || rv == 2);
5648         ++source->forward_flows;
5649 
5650         /* We have the next flow.  Set reverse if there is a
5651          * reverse record.  */
5652         source->reverse = (rv == 2);
5653     }
5654 
5655     pthread_mutex_unlock(&source->base->mutex);
5656 
5657     TRACE_RETURN(0);
5658 }
5659 
5660 
5661 
5662 
5663 /*
5664  *    The check-struct application calls the skiCheckDataStructure()
5665  *    function, and that function requires access to the C structures
5666  *    and templates that are local to this file.
5667  *
5668  *    The check-struct.c file contains both the
5669  *    skiCheckDataStructure() function and a main() function to use
5670  *    for the application.  The skiCheckDataStructure() is defined
5671  *    when SKIPFIX_SOURCE is defined, otherwise the main() function is
5672  *    defined.
5673  */
5674 #include "check-struct.c"
5675 
5676 
5677 /*
5678 ** Local Variables:
5679 ** mode:c
5680 ** indent-tabs-mode:nil
5681 ** c-basic-offset:4
5682 ** End:
5683 */
5684