1 /*
2 ** Copyright (C) 2007-2020 by Carnegie Mellon University.
3 **
4 ** @OPENSOURCE_LICENSE_START@
5 ** See license information in ../../LICENSE.txt
6 ** @OPENSOURCE_LICENSE_END@
7 */
8
9 /*
10 * skipfix.c
11 *
12 * This file and ipfixsource.c are tightly coupled, and together
13 * they read IPFIX records and convert them to SiLK flow records.
14 *
15 * This file primarly handles the conversion, and it is where the
16 * reading functions exist.
17 *
18 * The ipfixsource.c file is primary about setting up and tearing
19 * down the data structures used when processing IPFIX.
20 */
21
22 #define SKIPFIX_SOURCE 1
23 #include <silk/silk.h>
24
25 RCSIDENT("$SiLK: skipfix.c 7af5eab585e4 2020-04-15 15:56:48Z mthomas $");
26
27 #include "ipfixsource.h"
28 #include <silk/skipaddr.h>
29 #include <silk/skthread.h>
30
31 #ifdef SKIPFIX_TRACE_LEVEL
32 #define TRACEMSG_LEVEL SKIPFIX_TRACE_LEVEL
33 #endif
34 #define TRACEMSG(lvl, msg) TRACEMSG_TO_TRACEMSGLVL(lvl, msg)
35 #include <silk/sktracemsg.h>
36
37
38 /* LOCAL DEFINES AND TYPEDEFS */
39
40 /*
41 * Whether to process the subTemplateList element of the Tombstone
42 * record ski_tombstone_t.
43 */
44 #ifndef SKIPFIX_ENABLE_TOMBSTONE_TIMES
45 # define SKIPFIX_ENABLE_TOMBSTONE_TIMES 1
46 #endif
47
48 /*
49 * A context is added to incoming templates to assist when decoding
50 * records. The context is a 32-bit bitmap, and the following
51 * determines whether to allocate the bitmap or to use the pointer
52 * itself and cast the pointer to a uintptr_t.
53 */
54 #ifndef SKIPFIX_ALLOCATE_BITMAP
55 # if SK_SIZEOF_UINTPTR_T >= 4
56 # define SKIPFIX_ALLOCATE_BITMAP 0
57 # else
58 # define SKIPFIX_ALLOCATE_BITMAP 1
59 # endif
60 #endif
61
62 #if SKIPFIX_ALLOCATE_BITMAP
63 # define BMAP_TYPE uint32_t
64 # define BMAP_PRI "%#010" PRIx32
65 # define BMAP_TMPL_CTX_GET(m_template) \
66 ((fbTemplateGetContext(m_template) != NULL) \
67 ? *((BMAP_TYPE *)fbTemplateGetContext(m_template)) \
68 : 0)
69 # define BMAP_TMPL_CTX_SET(m_ctx, m_free_fn, m_bmap) \
70 do { \
71 BMAP_TYPE *bmapp = (BMAP_TYPE *)malloc(sizeof(BMAP_TYPE)); \
72 if (bmapp) { \
73 *bmapp = (m_bmap); \
74 *(m_ctx) = bmapp; \
75 *(m_free_fn) = free; \
76 } \
77 } while(0)
78 #else
79 # define BMAP_TYPE uintptr_t
80 # ifdef PRIxPTR
81 # define BMAP_PRI "%#010" PRIxPTR
82 # elif SK_SIZEOF_UINTPTR_T > 4
83 # define BMAP_PRI "%#010" PRIx64
84 # else
85 # define BMAP_PRI "%#010" PRIx32
86 # endif
87 # define BMAP_TMPL_CTX_GET(m_template) \
88 (BMAP_TYPE)fbTemplateGetContext(m_template)
89 # define BMAP_TMPL_CTX_SET(m_ctx, m_free_fn, m_bmap) \
90 do { \
91 *(m_ctx) = (void*)(m_bmap); \
92 *(m_free_fn) = NULL; \
93 } while(0)
94 #endif /* SKIPFIX_ALLOCATE_BITMAP */
95
96
97 /*
98 * If 'm_val' is greater then 'm_max', return 'm_max'. Otherwise,
99 * return 'm_val' masked by 'm_max'. (m_max & m_val).
100 */
101 #define CLAMP_VAL(m_val, m_max) \
102 (((m_val) > (m_max)) ? (m_max) : ((m_max) & (m_val)))
103 #define CLAMP_VAL16(m_val) CLAMP_VAL((m_val), UINT16_MAX)
104 #define CLAMP_VAL32(m_val) CLAMP_VAL((m_val), UINT32_MAX)
105
106 /* One more than UINT32_MAX */
107 #define ROLLOVER32 ((intmax_t)UINT32_MAX + 1)
108
109 /*
110 * For NetFlow V9, when the absolute value of the magnitude of the
111 * difference between the sysUpTime and the flowStartSysUpTime is
112 * greater than this value (in milliseconds), assume one of the
113 * values has rolled over.
114 */
115 #define MAXIMUM_FLOW_TIME_DEVIATION ((intmax_t)INT32_MAX)
116
117
118 /* Values for the flowEndReason. this first set is defined by the
119 * IPFIX spec */
120 #define SKI_END_IDLE 1
121 #define SKI_END_ACTIVE 2
122 #define SKI_END_CLOSED 3
123 #define SKI_END_FORCED 4
124 #define SKI_END_RESOURCE 5
125
126 /* SiLK will ignore flows with a flowEndReason of
127 * SKI_END_YAF_INTERMEDIATE_FLOW */
128 #define SKI_END_YAF_INTERMEDIATE_FLOW 0x1F
129
130 /* Mask for the values of flowEndReason: want to ignore the next bit */
131 #define SKI_END_MASK 0x1f
132
133 /* Bits from flowEndReason: whether flow is a continuation */
134 #define SKI_END_ISCONT 0x80
135
136 /* Bits from flowAttributes */
137 #define SKI_FLOW_ATTRIBUTE_UNIFORM_PACKET_SIZE 0x01
138
139
140 /* ********** Template Bitmap to Record Type ********** */
141
142 /*
143 * Each IPFIX Template is examined by skiTemplateCallbackCtx() when
144 * it is initially received to determine how to process data
145 * represented by the template.
146 *
147 * This up-front processing should reduce the overhead of needing
148 * to examining the content of every record, but at the expenses of
149 * looking at elements or templates which may never be used.
150 * Overall this should be a benefit as long as the number of
151 * records received is much higher than the number of templates
152 * received (in the TCP case, the templates are only sent once).
153 *
154 * When skiTemplateCallbackCtx() examines a Data Template (that is,
155 * a non-Options Template), it may determine that a specialized
156 * function should be used to read the data. The lower 3 bits of
157 * the bitmap detmine whether it was able to determine this, as
158 * explained in the next paragraphs:
159 *
160 * xx1. If the least significant bit is high, the general purpose
161 * ski_fixrec_next() function is used to read the data, and the
162 * other 31 bits are an indication what the template contains.
163 *
164 * x10. If the two LSB are 10, the data looks like NetFlow v9 and
165 * the ski_nf9rec_next() function is used to read the data. See
166 * the note below.
167 *
168 * 100. If the three LSB are 100, the data looks like YAF or SiLK
169 * data and the ski_yafrec_next() function is used to read the
170 * data. See the note below.
171 *
172 * NOTE: For the NetFlow v9 and YAF/SiLK data, the lower 16 bits of
173 * the bitmap represent the template ID that should be used to
174 * process the data. The upper 16 bits provide other bits to
175 * represent what may be present in the template.
176 *
177 * The bitmap is used to determine the ski_rectype_t value that is
178 * returned by ski_rectype_next().
179 */
180
181 #define BMAP_RECTYPE_MASK 0x00000007
182
183 #define BMAP_RECTYPE_YAFREC 0x00000004
184
185 #define BMAP_RECTYPE_NF9REC 0x00000002
186
187 #define BMAP_RECTYPE_FIXREC 0x00000001
188
189
190 /*
191 * ********** Determining What IEs a Template Contains **********
192 *
193 * The following TMPL_BIT_ieFoo macros are for setting and getting
194 * the bit associated with the ieFoo element in the bitmap
195 * associated with the template.
196 */
197 #define TMPL_BIT_flowStartMilliseconds (UINT32_C(1) << 1)
198 #define TMPL_BIT_flowEndMilliseconds (UINT32_C(1) << 2)
199
200 #define TMPL_BIT_systemInitTimeMilliseconds (UINT32_C(1) << 3)
201 #define TMPL_BIT_flowStartSysUpTime (UINT32_C(1) << 4)
202
203 #define TMPL_BIT_flowStartMicroseconds (UINT32_C(1) << 5)
204 #define TMPL_BIT_flowEndMicroseconds (UINT32_C(1) << 6)
205
206 #define TMPL_BIT_flowStartNanoseconds (UINT32_C(1) << 7)
207 #define TMPL_BIT_flowEndNanoseconds (UINT32_C(1) << 8)
208
209 #define TMPL_BIT_flowStartSeconds (UINT32_C(1) << 9)
210 #define TMPL_BIT_flowEndSeconds (UINT32_C(1) << 10)
211
212 #define TMPL_BIT_flowStartDeltaMicroseconds (UINT32_C(1) << 11)
213 #define TMPL_BIT_flowEndDeltaMicroseconds (UINT32_C(1) << 12)
214
215 #define TMPL_BIT_flowDurationMicroseconds (UINT32_C(1) << 13)
216 #define TMPL_BIT_flowDurationMilliseconds (UINT32_C(1) << 14)
217
218 /* either sourceIPv4Address or destinationIPv4Address */
219 #define TMPL_BIT_sourceIPv4Address (UINT32_C(1) << 15)
220 /* either sourceIPv6Address or destinationIPv6Address */
221 #define TMPL_BIT_sourceIPv6Address (UINT32_C(1) << 16)
222
223 #define TMPL_BIT_firewallEvent (UINT32_C(1) << 17)
224 #define TMPL_BIT_NF_F_FW_EVENT (UINT32_C(1) << 18)
225 #define TMPL_BIT_NF_F_FW_EXT_EVENT (UINT32_C(1) << 19)
226
227 #define TMPL_BIT_collectionTimeMilliseconds (UINT32_C(1) << 20)
228 #define TMPL_BIT_observationTimeMicroseconds (UINT32_C(1) << 21)
229 #define TMPL_BIT_observationTimeMilliseconds (UINT32_C(1) << 22)
230 #define TMPL_BIT_observationTimeNanoseconds (UINT32_C(1) << 23)
231 #define TMPL_BIT_observationTimeSeconds (UINT32_C(1) << 24)
232
233 /* either icmpTypeCodeIPv4 or icmpTypeCodeIPv6 */
234 #define TMPL_BIT_icmpTypeCodeIPv4 (UINT32_C(1) << 25)
235 /* at least one of icmpTypeIPv4, icmpCodeIPv4, icmpTypeIPv6, icmpCodeIPv6 */
236 #define TMPL_BIT_icmpTypeIPv4 (UINT32_C(1) << 26)
237
238 #define TMPL_BIT_postVlanId (UINT32_C(1) << 27)
239 #define TMPL_BIT_reverseVlanId (UINT32_C(1) << 28)
240
241 #define TMPL_BIT_reverseInitialTCPFlags (UINT32_C(1) << 29)
242 #define TMPL_BIT_reverseTcpControlBits (UINT32_C(1) << 30)
243
244
245 /*
246 * The following are not stored on the bitmap that is set as the
247 * template's context, but they are used when the template is
248 * examined initially.
249 */
250
251 /* octetDeltaCount */
252 #define TMPL_BIT_octetDeltaCount (UINT64_C(1) << 32)
253 /* packetDeltaCount */
254 #define TMPL_BIT_packetDeltaCount (UINT64_C(1) << 33)
255 /* octetTotalCount */
256 #define TMPL_BIT_octetTotalCount (UINT64_C(1) << 34)
257 /* packetTotalCount */
258 #define TMPL_BIT_packetTotalCount (UINT64_C(1) << 35)
259 /* either initiatorOctets or initiatorPackets */
260 #define TMPL_BIT_initiatorOctets (UINT64_C(1) << 36)
261 /* either responderOctets or responderPackets */
262 #define TMPL_BIT_responderOctets (UINT64_C(1) << 37)
263 /* either reverseOctetDeltaCount or reversePacketDeltaCount */
264 #define TMPL_BIT_reverseOctetDeltaCount (UINT64_C(1) << 38)
265 #define TMPL_BIT_initialTCPFlags (UINT64_C(1) << 39)
266 #define TMPL_BIT_reverseFlowDeltaMilliseconds (UINT64_C(1) << 40)
267 #define TMPL_BIT_subTemplateMultiList (UINT64_C(1) << 41)
268 /* either postOctetDeltaCount or postPacketDeltaCount */
269 #define TMPL_BIT_postOctetDeltaCount (UINT64_C(1) << 42)
270 /* either postOctetTotalCount or postPacketTotalCount */
271 #define TMPL_BIT_postOctetTotalCount (UINT64_C(1) << 43)
272 /* certToolId (YAF 2.11) */
273 #define TMPL_BIT_certToolId (UINT64_C(1) << 44)
274 /* exportingProcessId and observationTimeSeconds are the
275 * tombstone_access values for YAF 2.10 */
276 #define TMPL_BIT_exportingProcessId (UINT64_C(1) << 45)
277
278 /* The following are only checked in options templates, so the bit
279 * position here can repeat those above */
280 /* both IE49,IE50 (samplerMode, samplerRandomInterval) are present */
281 #define TMPL_BIT_samplerMode (UINT32_C(1) << 1)
282 /* both IE35,IE34 (samplingAlgorithm, samplingInterval) are present */
283 #define TMPL_BIT_samplingAlgorithm (UINT32_C(1) << 2)
284 #define TMPL_BIT_flowTableFlushEventCount (UINT32_C(1) << 3)
285 #define TMPL_BIT_flowTablePeakCount (UINT32_C(1) << 4)
286 #define TMPL_BIT_tombstoneId (UINT32_C(1) << 5)
287
288 /*
289 * Groupings of various bits.
290 */
291
292 #define TMPL_MASK_GAUNTLET_OF_TIME \
293 (TMPL_BIT_collectionTimeMilliseconds | \
294 TMPL_BIT_flowDurationMicroseconds | \
295 TMPL_BIT_flowDurationMilliseconds | \
296 TMPL_BIT_flowEndDeltaMicroseconds | \
297 TMPL_BIT_flowEndMicroseconds | \
298 TMPL_BIT_flowEndMilliseconds | \
299 TMPL_BIT_flowEndNanoseconds | \
300 TMPL_BIT_flowEndSeconds | \
301 TMPL_BIT_flowStartDeltaMicroseconds | \
302 TMPL_BIT_flowStartMicroseconds | \
303 TMPL_BIT_flowStartMilliseconds | \
304 TMPL_BIT_flowStartNanoseconds | \
305 TMPL_BIT_flowStartSeconds | \
306 TMPL_BIT_flowStartSysUpTime | \
307 TMPL_BIT_observationTimeMicroseconds | \
308 TMPL_BIT_observationTimeMilliseconds | \
309 TMPL_BIT_observationTimeNanoseconds | \
310 TMPL_BIT_observationTimeSeconds | \
311 TMPL_BIT_systemInitTimeMilliseconds)
312
313 #define TMPL_MASK_IPADDRESS \
314 (TMPL_BIT_sourceIPv4Address | \
315 TMPL_BIT_sourceIPv6Address)
316
317 #define TMPL_MASK_VOLUME_YAF \
318 (TMPL_BIT_octetTotalCount | \
319 TMPL_BIT_packetTotalCount | \
320 TMPL_BIT_octetDeltaCount | \
321 TMPL_BIT_packetDeltaCount)
322
323 #define TMPL_MASK_TIME_MILLI_YAF \
324 (TMPL_BIT_flowStartMilliseconds | \
325 TMPL_BIT_flowEndMilliseconds)
326
327 #define TMPL_MASK_VOLUME_NF9 \
328 (TMPL_BIT_octetDeltaCount | \
329 TMPL_BIT_packetDeltaCount | \
330 TMPL_BIT_octetTotalCount | \
331 TMPL_BIT_packetTotalCount | \
332 TMPL_BIT_initiatorOctets | \
333 TMPL_BIT_responderOctets | \
334 TMPL_BIT_postOctetDeltaCount | \
335 TMPL_BIT_postOctetTotalCount)
336
337 #define TMPL_MASK_TIME_SYSUP \
338 (TMPL_BIT_systemInitTimeMilliseconds | \
339 TMPL_BIT_flowStartSysUpTime)
340
341 #define TMPL_MASK_TIME_MILLI_NF9 \
342 (TMPL_BIT_flowStartMilliseconds | \
343 TMPL_BIT_observationTimeMilliseconds)
344
345 #define TMPL_MASK_TIME_NF9 \
346 (TMPL_MASK_TIME_SYSUP | \
347 TMPL_MASK_TIME_MILLI_NF9)
348
349 #define TMPL_MASK_YAFREC \
350 (TMPL_MASK_IPADDRESS | \
351 TMPL_MASK_VOLUME_YAF | \
352 TMPL_MASK_TIME_MILLI_YAF | \
353 TMPL_BIT_reverseOctetDeltaCount | \
354 TMPL_BIT_initialTCPFlags | \
355 TMPL_BIT_icmpTypeCodeIPv4 | \
356 TMPL_BIT_reverseVlanId | \
357 TMPL_BIT_reverseInitialTCPFlags | \
358 TMPL_BIT_reverseTcpControlBits | \
359 TMPL_BIT_reverseFlowDeltaMilliseconds | \
360 TMPL_BIT_subTemplateMultiList)
361
362 #define TMPL_MASK_NF9REC \
363 (TMPL_MASK_IPADDRESS | \
364 TMPL_MASK_VOLUME_NF9 | \
365 TMPL_MASK_TIME_NF9 | \
366 TMPL_BIT_icmpTypeCodeIPv4 | \
367 TMPL_BIT_icmpTypeIPv4 | \
368 TMPL_BIT_postVlanId | \
369 TMPL_BIT_firewallEvent | \
370 TMPL_BIT_NF_F_FW_EVENT | \
371 TMPL_BIT_NF_F_FW_EXT_EVENT)
372
373 #define ASSERT_IE_NAME_IS(aini_ie, aini_name) \
374 assert(TMPL_BIT_ ## aini_name \
375 && 0==strcmp((aini_ie)->ref.canon->ref.name, #aini_name))
376
377
378 /*
379 * ********** "Give Me Everything" Template for Import **********
380 *
381 * This is the template and a matching struct used for reading
382 * generic flow records. The template and struct are used by
383 * ski_fixrec_next() when reading data.
384 *
385 * The template contains all the IPFIX fields that SiLK supports
386 * when importing data.
387 *
388 * The type for these records is SKI_RECTYPE_FIXREC
389 */
390
391 #define SKI_FIXREC_TID 0xAFEB
392
393 #define SKI_FIXREC_PADDING 2
394
395 static fbInfoElementSpec_t ski_fixrec_spec[] = {
396 /* Ports, Protocol */
397 { (char*)"sourceTransportPort", 2, 0 },
398 { (char*)"destinationTransportPort", 2, 0 },
399 { (char*)"protocolIdentifier", 1, 0 },
400 /* TCP Flags (reverse values below) */
401 { (char*)"tcpControlBits", 1, 0 },
402 { (char*)"initialTCPFlags", 1, 0 },
403 { (char*)"unionTCPFlags", 1, 0 },
404 /* Router interfaces */
405 { (char*)"ingressInterface", 4, 0 },
406 { (char*)"egressInterface", 4, 0 },
407 /* Volume, as Delta (reverse values below) */
408 { (char*)"packetDeltaCount", 8, 0 },
409 { (char*)"octetDeltaCount", 8, 0 },
410 /* Volume, as Total (reverse values below) */
411 { (char*)"packetTotalCount", 8, 0 },
412 { (char*)"octetTotalCount", 8, 0 },
413 /* Volume, yet more */
414 { (char*)"initiatorPackets", 8, 0 },
415 { (char*)"initiatorOctets", 8, 0 },
416 { (char*)"responderPackets", 8, 0 },
417 { (char*)"responderOctets", 8, 0 },
418 /* Flow attributes (reverse value below) */
419 { (char*)"flowAttributes", 2, 0 },
420 /* SiLK Fields */
421 { (char*)"silkAppLabel", 2, 0 },
422 { (char*)"silkFlowSensor", 2, 0 },
423 { (char*)"silkFlowType", 1, 0 },
424 { (char*)"silkTCPState", 1, 0 },
425 /* Vlan IDs */
426 { (char*)"vlanId", 2, 0 },
427 { (char*)"postVlanId", 2, 0 },
428 /* Firewall events */
429 { (char*)"firewallEvent", 1, 0 },
430 { (char*)"NF_F_FW_EVENT", 1, 0 },
431 { (char*)"NF_F_FW_EXT_EVENT", 2, 0 },
432 /* ICMP */
433 { (char*)"icmpTypeCodeIPv4", 2, 0 },
434 { (char*)"icmpTypeIPv4", 1, 0 },
435 { (char*)"icmpCodeIPv4", 1, 0 },
436 { (char*)"icmpTypeCodeIPv6", 2, 0 },
437 { (char*)"icmpTypeIPv6", 1, 0 },
438 { (char*)"icmpCodeIPv6", 1, 0 },
439 /* Millisecond start and end (epoch) (native time) */
440 { (char*)"flowStartMilliseconds", 8, 0 },
441 { (char*)"flowEndMilliseconds", 8, 0 },
442 /* SysUpTime, used to handle Netflow v9 SysUpTime offset times */
443 { (char*)"systemInitTimeMilliseconds", 8, 0 },
444 { (char*)"flowStartSysUpTime", 4, 0 },
445 { (char*)"flowEndSysUpTime", 4, 0 },
446 /* Microsecond start and end (RFC1305-style) */
447 { (char*)"flowStartMicroseconds", 8, 0 },
448 { (char*)"flowEndMicroseconds", 8, 0 },
449 /* Nanosecond start and end (RFC1305-style) */
450 { (char*)"flowStartNanoseconds", 8, 0 },
451 { (char*)"flowEndNanoseconds", 8, 0 },
452 /* Second start and end */
453 { (char*)"flowStartSeconds", 4, 0 },
454 { (char*)"flowEndSeconds", 4, 0 },
455 /* Microsecond delta start and end */
456 { (char*)"flowStartDeltaMicroseconds", 4, 0 },
457 { (char*)"flowEndDeltaMicroseconds", 4, 0 },
458 /* Flow durations */
459 { (char*)"flowDurationMicroseconds", 4, 0 },
460 { (char*)"flowDurationMilliseconds", 4, 0 },
461 /* Collection time and Observation time */
462 { (char*)"collectionTimeMilliseconds", 8, 0 },
463 { (char*)"observationTimeMilliseconds", 8, 0 },
464 { (char*)"observationTimeMicroseconds", 8, 0 },
465 { (char*)"observationTimeNanoseconds", 8, 0 },
466 { (char*)"observationTimeSeconds", 4, 0 },
467 /* IPv4 Addresses */
468 { (char*)"sourceIPv4Address", 4, 0 },
469 { (char*)"destinationIPv4Address", 4, 0 },
470 { (char*)"ipNextHopIPv4Address", 4, 0 },
471 /* IPv6 Addresses */
472 { (char*)"sourceIPv6Address", 16, 0 },
473 { (char*)"destinationIPv6Address", 16, 0 },
474 { (char*)"ipNextHopIPv6Address", 16, 0 },
475 /* Volumes as flow leaves the router or middlebox */
476 { (char*)"postPacketDeltaCount", 8, 0 },
477 { (char*)"postOctetDeltaCount", 8, 0 },
478 { (char*)"postPacketTotalCount", 8, 0 },
479 { (char*)"postOctetTotalCount", 8, 0 },
480 /* End reason */
481 { (char*)"flowEndReason", 1, 0 },
482 /* TCP Flags (reverse) */
483 { (char*)"reverseTcpControlBits", 1, 0 },
484 { (char*)"reverseInitialTCPFlags", 1, 0 },
485 { (char*)"reverseUnionTCPFlags", 1, 0 },
486 /* Initial packet roundtrip */
487 { (char*)"reverseFlowDeltaMilliseconds", 4, 0 },
488 /* Volume, as Delta (reverse) */
489 { (char*)"reversePacketDeltaCount", 8, 0 },
490 { (char*)"reverseOctetDeltaCount", 8, 0 },
491 /* Volume, as Total (reverse) */
492 { (char*)"reversePacketTotalCount", 8, 0 },
493 { (char*)"reverseOctetTotalCount", 8, 0 },
494 /* Vlan IDs (reverse) */
495 { (char*)"reverseVlanId", 2, 0 },
496 { (char*)"reversePostVlanId", 2, 0 },
497 /* Flow attributes (reverse) */
498 { (char*)"reverseFlowAttributes", 2, 0 },
499 #if SKI_FIXREC_PADDING != 0
500 { (char*)"paddingOctets", SKI_FIXREC_PADDING, 0 },
501 #endif
502 { (char*)"subTemplateMultiList", 0, 0 },
503 FB_IESPEC_NULL
504 };
505
506 typedef struct ski_fixrec_st {
507 uint16_t sourceTransportPort; /* 0- 1 */
508 uint16_t destinationTransportPort; /* 2- 3 */
509
510 uint8_t protocolIdentifier; /* 4 */
511 uint8_t tcpControlBits; /* 5 */
512 uint8_t initialTCPFlags; /* 6 */
513 uint8_t unionTCPFlags; /* 7 */
514
515 uint32_t ingressInterface; /* 8- 11 */
516 uint32_t egressInterface; /* 12- 15 */
517
518 uint64_t packetDeltaCount; /* 16- 23 */
519 uint64_t octetDeltaCount; /* 24- 31 */
520
521 uint64_t packetTotalCount; /* 32- 39 */
522 uint64_t octetTotalCount; /* 40- 47 */
523
524 uint64_t initiatorPackets; /* 48- 55 */
525 uint64_t initiatorOctets; /* 56- 63 */
526
527 uint64_t responderPackets; /* 64- 71 */
528 uint64_t responderOctets; /* 72- 79 */
529
530 /* Flow attribute flags (reverse value below) */
531 uint16_t flowAttributes; /* 80- 81 */
532
533 /* SiLK valuee */
534 uint16_t silkAppLabel; /* 82- 83 */
535 uint16_t silkFlowSensor; /* 84- 85 */
536 uint8_t silkFlowType; /* 86 */
537 uint8_t silkTCPState; /* 87 */
538
539 /* vlan IDs (reverse values below) */
540 uint16_t vlanId; /* 88- 89 */
541 uint16_t postVlanId; /* 90- 91 */
542
543 /* Firewall events */
544 uint8_t firewallEvent; /* 92 */
545 uint8_t NF_F_FW_EVENT; /* 93 */
546 uint16_t NF_F_FW_EXT_EVENT; /* 94- 95 */
547
548 /* ICMP */
549 uint16_t icmpTypeCodeIPv4; /* 96- 97 */
550 uint8_t icmpTypeIPv4; /* 98 */
551 uint8_t icmpCodeIPv4; /* 99 */
552 uint16_t icmpTypeCodeIPv6; /* 100-101 */
553 uint8_t icmpTypeIPv6; /* 102 */
554 uint8_t icmpCodeIPv6; /* 103 */
555
556 /* Time can be represented in many different formats: */
557 uint64_t flowStartMilliseconds; /* 104-111 */
558 uint64_t flowEndMilliseconds; /* 112-119 */
559
560 /* SysUpTime: used for flow{Start,End}SysUpTime calculations.
561 * Needed to support Netflow v9 in particular. */
562 uint64_t systemInitTimeMilliseconds; /* 120-127 */
563
564 /* Start and end time as delta from the system init time. Needed
565 * to support Netflow v9. */
566 uint32_t flowStartSysUpTime; /* 128-131 */
567 uint32_t flowEndSysUpTime; /* 132-135 */
568
569 /* start time as NTP microseconds (RFC1305); may either have end
570 * Time in same format or as an flowDurationMicroseconds value. */
571 uint64_t flowStartMicroseconds; /* 136-143 */
572 uint64_t flowEndMicroseconds; /* 144-151 */
573
574 /* start time as NTP nanoseconds (RFC1305) */
575 uint64_t flowStartNanoseconds; /* 152-159 */
576 uint64_t flowEndNanoseconds; /* 160-167 */
577
578 /* start time and end times as seconds since UNIX epoch. no
579 * flowDuration field */
580 uint32_t flowStartSeconds; /* 168-171 */
581 uint32_t flowEndSeconds; /* 172-175 */
582
583 /* start time as delta (negative microsec offsets) from the export
584 * time; may either have end time in same format or a
585 * flowDurationMicroseconds value */
586 uint32_t flowStartDeltaMicroseconds; /* 176-179 */
587 uint32_t flowEndDeltaMicroseconds; /* 180-183 */
588
589 /* elapsed time as either microsec or millisec. used when the
590 * flowEnd time is not given. */
591 uint32_t flowDurationMicroseconds; /* 184-187 */
592 uint32_t flowDurationMilliseconds; /* 188-191 */
593
594 /* Collection time and Observation time */
595 uint64_t collectionTimeMilliseconds; /* 192-199 */
596 uint64_t observationTimeMilliseconds; /* 200-207 */
597 uint64_t observationTimeMicroseconds; /* 208-215 */
598 uint64_t observationTimeNanoseconds; /* 216-223 */
599 uint32_t observationTimeSeconds; /* 224-227 */
600
601 uint32_t sourceIPv4Address; /* 228-231 */
602 uint32_t destinationIPv4Address; /* 232-235 */
603 uint32_t ipNextHopIPv4Address; /* 236-239 */
604
605 uint8_t sourceIPv6Address[16]; /* 240-255 */
606 uint8_t destinationIPv6Address[16]; /* 256-271 */
607 uint8_t ipNextHopIPv6Address[16]; /* 272-287 */
608
609 /* egress volume; used when ingress volume is 0 */
610 uint64_t postPacketDeltaCount; /* 288-295 */
611 uint64_t postOctetDeltaCount; /* 296-303 */
612 uint64_t postPacketTotalCount; /* 304-311 */
613 uint64_t postOctetTotalCount; /* 312-319 */
614
615 uint8_t flowEndReason; /* 320 */
616
617 /* Flags for the reverse flow: */
618 uint8_t reverseTcpControlBits; /* 321 */
619 uint8_t reverseInitialTCPFlags; /* 322 */
620 uint8_t reverseUnionTCPFlags; /* 323 */
621
622 /* start time of reverse flow, as millisec offset from start time
623 * of forward flow */
624 uint32_t reverseFlowDeltaMilliseconds; /* 324-327 */
625
626 uint64_t reversePacketDeltaCount; /* 328-335 */
627 uint64_t reverseOctetDeltaCount; /* 336-343 */
628 uint64_t reversePacketTotalCount; /* 344-351 */
629 uint64_t reverseOctetTotalCount; /* 352-359 */
630
631 /* vlan IDs (reverse) */
632 uint16_t reverseVlanId; /* 360-361 */
633 uint16_t reversePostVlanId; /* 362-363 */
634
635 /* Flow attribute flags (reverse) */
636 uint16_t reverseFlowAttributes; /* 364-365 */
637
638 /* padding */
639 #if SKI_FIXREC_PADDING != 0
640 uint8_t paddingOctets[SKI_FIXREC_PADDING];/* 366-367 */
641 #endif
642
643 /* TCP flags from yaf (when it is run without --silk) */
644 fbSubTemplateMultiList_t stml;
645 } ski_fixrec_t;
646
647
648 /*
649 * ********** YAF/SiLK Template for Import **********
650 *
651 * These are templates used for reading records we know are coming
652 * from SiLK or YAF. The templates and struct are used
653 * by ski_yafrec_next() when reading data.
654 *
655 * The type for these records is SKI_RECTYPE_YAFREC
656 */
657
658 /*
659 * There are several templates defined here. The following
660 * YAFREC_* macros determine which elements in the
661 * ski_yafrec_spec[] are used. For the template that uses the
662 * elements, the correspong bits are set to high in the
663 * SKI_YAFREC_TID below.
664 *
665 * On SKI_YAFREC_TID, high bits are 2, 12, 13, 14
666 */
667 #define YAFREC_UNI (1 << 3)
668 #define YAFREC_BI (1 << 4)
669
670 #define YAFREC_ONLY_IP4 (1 << 5)
671 #define YAFREC_ONLY_IP6 (1 << 6)
672 #define YAFREC_IP_BOTH (1 << 7)
673
674 #define YAFREC_DELTA (1 << 8)
675 #define YAFREC_TOTAL (1 << 9)
676
677 #define YAFREC_STML (1 << 10)
678
679 #define SKI_YAFREC_TID 0x7004
680 #if (SKI_YAFREC_TID & BMAP_RECTYPE_MASK) != BMAP_RECTYPE_YAFREC
681 #error "bad SKI_YAFREC_TID value"
682 #endif
683
684 static fbInfoElementSpec_t ski_yafrec_spec[] = {
685 /* Ports, Protocol */
686 { (char*)"sourceTransportPort", 2, 0 },
687 { (char*)"destinationTransportPort", 2, 0 },
688 { (char*)"protocolIdentifier", 1, 0 },
689 /* TCP Flags (reverse values below) */
690 { (char*)"tcpControlBits", 1, 0 },
691 { (char*)"initialTCPFlags", 1, 0 },
692 { (char*)"unionTCPFlags", 1, 0 },
693 /* Router interfaces */
694 { (char*)"ingressInterface", 4, 0 },
695 { (char*)"egressInterface", 4, 0 },
696 /* Volume, as either Delta or Total */
697 { (char*)"packetDeltaCount", 8, YAFREC_DELTA },
698 { (char*)"octetDeltaCount", 8, YAFREC_DELTA },
699 { (char*)"packetTotalCount", 8, YAFREC_TOTAL },
700 { (char*)"octetTotalCount", 8, YAFREC_TOTAL },
701 /* Flow attributes (reverse value below) */
702 { (char*)"flowAttributes", 2, 0 },
703 /* SiLK Fields */
704 { (char*)"silkAppLabel", 2, 0 },
705 { (char*)"silkFlowSensor", 2, 0 },
706 { (char*)"silkFlowType", 1, 0 },
707 { (char*)"silkTCPState", 1, 0 },
708 /* Millisecond start and end (epoch) (native time) */
709 { (char*)"flowStartMilliseconds", 8, 0 },
710 { (char*)"flowEndMilliseconds", 8, 0 },
711 /* Vlan IDs */
712 { (char*)"vlanId", 2, 0 },
713 { (char*)"postVlanId", 2, 0 },
714 /* ICMP, either IPv4 or IPv6 */
715 { (char*)"icmpTypeCodeIPv4", 2, YAFREC_IP_BOTH },
716 { (char*)"icmpTypeCodeIPv4", 2, YAFREC_ONLY_IP4 },
717 { (char*)"icmpTypeCodeIPv6", 2, YAFREC_ONLY_IP6 },
718 /* End reason */
719 { (char*)"flowEndReason", 1, 0 },
720 /* TOS */
721 { (char*)"ipClassOfService", 1, 0 },
722 /* IPv4 Addresses; if no IPv4 addresses, add 16 bytes of padding */
723 { (char*)"paddingOctets", 16, YAFREC_ONLY_IP6 },
724 { (char*)"sourceIPv4Address", 4, YAFREC_IP_BOTH },
725 { (char*)"destinationIPv4Address", 4, YAFREC_IP_BOTH },
726 { (char*)"ipNextHopIPv4Address", 4, YAFREC_IP_BOTH },
727 { (char*)"sourceIPv4Address", 4, YAFREC_ONLY_IP4 },
728 { (char*)"destinationIPv4Address", 4, YAFREC_ONLY_IP4 },
729 { (char*)"ipNextHopIPv4Address", 4, YAFREC_ONLY_IP4 },
730 /* If both IPv4 and IPv6 addresses are present, add 4 bytes of
731 * padding. If this is an IPv4 bi-flow, add 4 bytes of padding
732 * and 48 bytes of padding to skip the IPv6 addresess. If an
733 * IPv4-uniflow, this is the end of the record, and pad to the
734 * end. */
735 { (char*)"paddingOctets", 4, YAFREC_IP_BOTH },
736 { (char*)"paddingOctets", 52, (YAFREC_ONLY_IP4
737 | YAFREC_BI) },
738 { (char*)"paddingOctets", 84, (YAFREC_ONLY_IP4
739 | YAFREC_UNI) },
740 /* Ingore the IPv6 addresses for an IPv4 bi-flow */
741 /* IPv6 Addresses */
742 { (char*)"sourceIPv6Address", 16, YAFREC_IP_BOTH },
743 { (char*)"destinationIPv6Address", 16, YAFREC_IP_BOTH },
744 { (char*)"ipNextHopIPv6Address", 16, YAFREC_IP_BOTH },
745 { (char*)"sourceIPv6Address", 16, YAFREC_ONLY_IP6 },
746 { (char*)"destinationIPv6Address", 16, YAFREC_ONLY_IP6 },
747 { (char*)"ipNextHopIPv6Address", 16, YAFREC_ONLY_IP6 },
748 /* Ignore the the reverse elements for a uniflow record that is
749 * either IPv6 only or has both IPv4 and IPv6 IPs. */
750 { (char*)"paddingOctets", 32, (YAFREC_IP_BOTH
751 | YAFREC_UNI) },
752 { (char*)"paddingOctets", 32, (YAFREC_ONLY_IP6
753 | YAFREC_UNI) },
754 /* Volume (reverse), as either Delta or Total */
755 { (char*)"reversePacketDeltaCount", 8, (YAFREC_DELTA
756 | YAFREC_BI) },
757 { (char*)"reverseOctetDeltaCount", 8, (YAFREC_DELTA
758 | YAFREC_BI) },
759 { (char*)"reversePacketTotalCount", 8, (YAFREC_TOTAL
760 | YAFREC_BI) },
761 { (char*)"reverseOctetTotalCount", 8, (YAFREC_TOTAL
762 | YAFREC_BI) },
763 /* Initial packet roundtrip */
764 { (char*)"reverseFlowDeltaMilliseconds", 4, YAFREC_BI },
765 /* Vlan IDs (reverse) */
766 { (char*)"reverseVlanId", 2, YAFREC_BI },
767 { (char*)"reversePostVlanId", 2, YAFREC_BI },
768 /* Flow attributes (reverse) */
769 { (char*)"reverseFlowAttributes", 2, YAFREC_BI },
770 /* TCP Flags (reverse) */
771 { (char*)"reverseTcpControlBits", 1, YAFREC_BI },
772 { (char*)"reverseInitialTCPFlags", 1, YAFREC_BI },
773 { (char*)"reverseUnionTCPFlags", 1, YAFREC_BI },
774 { (char*)"reverseIpClassOfService", 1, YAFREC_BI },
775 { (char*)"paddingOctets", 2, YAFREC_BI },
776 { (char*)"subTemplateMultiList", 0, YAFREC_STML },
777 FB_IESPEC_NULL
778 };
779
780 typedef struct ski_yafrec_st {
781 uint16_t sourceTransportPort; /* 0- 1 */
782 uint16_t destinationTransportPort; /* 2- 3 */
783
784 uint8_t protocolIdentifier; /* 4 */
785 uint8_t tcpControlBits; /* 5 */
786 uint8_t initialTCPFlags; /* 6 */
787 uint8_t unionTCPFlags; /* 7 */
788
789 uint32_t ingressInterface; /* 8- 11 */
790 uint32_t egressInterface; /* 12- 15 */
791
792 /* may also hold packetTotalCount and octetTotalCount values */
793 uint64_t packetDeltaCount; /* 16- 23 */
794 uint64_t octetDeltaCount; /* 24- 31 */
795
796 /* Flow attribute flags (reverse value below) */
797 uint16_t flowAttributes; /* 32- 33 */
798
799 /* SiLK valuee */
800 uint16_t silkAppLabel; /* 34- 35 */
801 uint16_t silkFlowSensor; /* 36- 37 */
802 uint8_t silkFlowType; /* 38 */
803 uint8_t silkTCPState; /* 39 */
804
805 /* Time can be represented in many different formats: */
806 uint64_t flowStartMilliseconds; /* 40- 55 */
807 uint64_t flowEndMilliseconds; /* 48- 63 */
808
809 /* vlan IDs (reverse values below) */
810 uint16_t vlanId; /* 56- 57 */
811 uint16_t postVlanId; /* 58- 59 */
812
813 /* ICMP, may be icmpTypeCodeIPv4 or icmpTypeCodeIPv6 */
814 uint16_t icmpTypeCode; /* 60- 61 */
815 uint8_t flowEndReason; /* 62 */
816 uint8_t ipClassOfService; /* 63 */
817
818 uint32_t sourceIPv4Address; /* 64- 67 */
819 uint32_t destinationIPv4Address; /* 68- 71 */
820 uint32_t ipNextHopIPv4Address; /* 72- 75 */
821 uint32_t paddingOctets_1; /* 76- 79 */
822
823 uint8_t sourceIPv6Address[16]; /* 80- 95 */
824 uint8_t destinationIPv6Address[16]; /* 96-111 */
825 uint8_t ipNextHopIPv6Address[16]; /* 112-127 */
826
827 /* may also hold reversePacketTotalCount and
828 * reverseOctetTotalCount values */
829 uint64_t reversePacketDeltaCount; /* 128-135 */
830 uint64_t reverseOctetDeltaCount; /* 136-143 */
831
832 /* start time of reverse flow, as millisec offset from start time
833 * of forward flow */
834 uint32_t reverseFlowDeltaMilliseconds; /* 144-147 */
835
836 /* vlan IDs (reverse) */
837 uint16_t reverseVlanId; /* 148-149 */
838 uint16_t reversePostVlanId; /* 150-151 */
839
840 /* Flow attribute flags (reverse) */
841 uint16_t reverseFlowAttributes; /* 152-153 */
842
843 /* Flags for the reverse flow: */
844 uint8_t reverseTcpControlBits; /* 154 */
845 uint8_t reverseInitialTCPFlags; /* 155 */
846 uint8_t reverseUnionTCPFlags; /* 156 */
847 uint8_t reverseIpClassOfService; /* 157 */
848 uint16_t paddingOctets_2; /* 158-159 */
849
850 /* TCP flags from yaf (when it is run without --silk) */
851 fbSubTemplateMultiList_t stml; /* 160-... */
852 } ski_yafrec_t;
853
854
855 /*
856 * ********** YAF SubTemplateMultiList TCP Info Template **********
857 *
858 * Use the following to read TCP flags that YAF has exported in an
859 * IPFIX subTemplateMultiList.
860 */
861
862 /* Incoming Template ID used by YAF for a subTemplateMultiList item
863 * containing only forward TCP flags information. */
864 #define SKI_YAF_TCP_FLOW_TID 0xC003
865
866 /* Bit in the incoming Template ID that yaf sets for templates
867 * containing reverse elements */
868 #define SKI_YAF_REVERSE_BIT 0x0010
869
870 /* Internal Template ID */
871 #define SKI_TCP_STML_TID 0xAFEC
872
873 static fbInfoElementSpec_t ski_tcp_stml_spec[] = {
874 { (char*)"initialTCPFlags", 1, 0 },
875 { (char*)"unionTCPFlags", 1, 0 },
876 { (char*)"reverseInitialTCPFlags", 1, 0 },
877 { (char*)"reverseUnionTCPFlags", 1, 0 },
878 FB_IESPEC_NULL
879 };
880
881 typedef struct ski_tcp_stml_st {
882 uint8_t initialTCPFlags;
883 uint8_t unionTCPFlags;
884 uint8_t reverseInitialTCPFlags;
885 uint8_t reverseUnionTCPFlags;
886 } ski_tcp_stml_t;
887
888
889 /*
890 * ********** YAF Statistics Options Template **********
891 *
892 * Information for statistics information exported by YAF. The
893 * template and structure are based on the yaf 2.3.0 manual page.
894 * The templates and struct are used by ski_yafstats_next() when
895 * reading data.
896 *
897 * The type for these records is SKI_RECTYPE_YAFSTATS
898 *
899 * These types are defined in ipfixsource.h so they may be shared
900 * with ipfixsource.c and that the source structure may contain
901 * them.
902 *
903 * #define SKI_YAFSTATS_TID 0xD000
904 *
905 * static fbInfoElementSpec_t ski_yafstats_spec[] = {...};
906 *
907 * typedef struct ski_yafstats_st { ... } ski_yafstats_t;
908 *
909 */
910
911
912
913 /*
914 * ********** NetFlowV9 Data Template **********
915 *
916 * Define the list of information elements and the corresponding
917 * struct for reading common NetFlowV9 records. The templates and
918 * struct are used by ski_nf9rec_next() when reading data.
919 *
920 * The type for these records is SKI_RECTYPE_NF9REC
921 */
922
923 /*
924 * There are several templates defined here. The following macros
925 * determine which elements in the ski_nf9rec_spec[] are used. For
926 * the template that uses the elements, the correspong bits are set
927 * to high in the SKI_NF9REC_TID below.
928 *
929 * On SKI_NF9REC_TID, high bits are 1, 13, 14.
930 */
931 #define NF9REC_DELTA (1 << 2)
932 #define NF9REC_TOTAL (1 << 3)
933 #define NF9REC_INITIATOR (1 << 4)
934
935 #define NF9REC_IP4 (1 << 5)
936 #define NF9REC_IP6 (1 << 6)
937
938 #define NF9REC_SYSUP (1 << 7)
939 #define NF9REC_MILLI (1 << 8)
940
941 #define SKI_NF9REC_TID 0x6002
942 #if (SKI_NF9REC_TID & BMAP_RECTYPE_MASK) != BMAP_RECTYPE_NF9REC
943 #error "bad SKI_NF9REC_TID value"
944 #endif
945
946 static fbInfoElementSpec_t ski_nf9rec_spec[] = {
947 /* Ports, Protocol */
948 { (char*)"sourceTransportPort", 2, 0 },
949 { (char*)"destinationTransportPort", 2, 0 },
950 { (char*)"protocolIdentifier", 1, 0 },
951 /* TCP Flags */
952 { (char*)"tcpControlBits", 1, 0 },
953 /* End reason */
954 { (char*)"flowEndReason", 1, 0 },
955 /* TOS */
956 { (char*)"ipClassOfService", 1, 0 },
957 /* Router interfaces */
958 { (char*)"ingressInterface", 4, 0 },
959 { (char*)"egressInterface", 4, 0 },
960 /* Volume, in one of three different ways; initiatorOctets has
961 * matching responderOctets. This does not handle OUT_BYTES(23)
962 * and OUT_PACKETS(24) IEs, that fixbuf translates to reverse
963 * Delta elements */
964 { (char*)"packetDeltaCount", 8, NF9REC_DELTA },
965 { (char*)"octetDeltaCount", 8, NF9REC_DELTA },
966 { (char*)"postPacketDeltaCount", 8, NF9REC_DELTA },
967 { (char*)"postOctetDeltaCount", 8, NF9REC_DELTA },
968 { (char*)"packetTotalCount", 8, NF9REC_TOTAL },
969 { (char*)"octetTotalCount", 8, NF9REC_TOTAL },
970 { (char*)"postPacketTotalCount", 8, NF9REC_TOTAL },
971 { (char*)"postOctetTotalCount", 8, NF9REC_TOTAL },
972 { (char*)"initiatorPackets", 8, NF9REC_INITIATOR},
973 { (char*)"initiatorOctets", 8, NF9REC_INITIATOR },
974 { (char*)"responderPackets", 8, NF9REC_INITIATOR},
975 { (char*)"responderOctets", 8, NF9REC_INITIATOR },
976 /* SysUpTime, used to handle Netflow v9 SysUpTime offset times */
977 { (char*)"systemInitTimeMilliseconds", 8, NF9REC_SYSUP },
978 { (char*)"flowStartSysUpTime", 4, NF9REC_SYSUP },
979 { (char*)"flowEndSysUpTime", 4, NF9REC_SYSUP },
980 /* Millisecond start and end. Note that end uses the
981 * observationTime value, though the structure calls it
982 * flowEndMilliseconds. */
983 { (char*)"flowStartMilliseconds", 8, NF9REC_MILLI },
984 { (char*)"observationTimeMilliseconds", 8, NF9REC_MILLI },
985 /* Vlan IDs */
986 { (char*)"vlanId", 2, 0 },
987 { (char*)"postVlanId", 2, 0 },
988 /* ICMP, either IPv4 or IPv6 */
989 { (char*)"icmpTypeCodeIPv4", 2, NF9REC_IP4 },
990 { (char*)"icmpTypeIPv4", 1, NF9REC_IP4 },
991 { (char*)"icmpCodeIPv4", 1, NF9REC_IP4 },
992 { (char*)"icmpTypeCodeIPv6", 2, NF9REC_IP6 },
993 { (char*)"icmpTypeIPv6", 1, NF9REC_IP6 },
994 { (char*)"icmpCodeIPv6", 1, NF9REC_IP6 },
995 /* IPv4 Addresses */
996 { (char*)"sourceIPv4Address", 4, NF9REC_IP4 },
997 { (char*)"destinationIPv4Address", 4, NF9REC_IP4 },
998 { (char*)"ipNextHopIPv4Address", 4, NF9REC_IP4 },
999 /* Pad to the firewall event: 3*(16-4)+4 */
1000 { (char*)"paddingOctets", 40, NF9REC_IP4 },
1001 /* IPv6 Addresses */
1002 { (char*)"sourceIPv6Address", 16, NF9REC_IP6 },
1003 { (char*)"destinationIPv6Address", 16, NF9REC_IP6 },
1004 { (char*)"ipNextHopIPv6Address", 16, NF9REC_IP6 },
1005
1006 /* Firewall events */
1007 { (char*)"paddingOctets", 4, NF9REC_IP6 },
1008 { (char*)"firewallEvent", 1, 0 },
1009 { (char*)"NF_F_FW_EVENT", 1, 0 },
1010 { (char*)"NF_F_FW_EXT_EVENT", 2, 0 },
1011 FB_IESPEC_NULL
1012 };
1013
1014 typedef struct ski_nf9rec_st {
1015 uint16_t sourceTransportPort; /* 0- 1 */
1016 uint16_t destinationTransportPort; /* 2- 3 */
1017
1018 uint8_t protocolIdentifier; /* 4 */
1019 uint8_t tcpControlBits; /* 5 */
1020 uint8_t flowEndReason; /* 6 */
1021 uint8_t ipClassOfService; /* 7 */
1022
1023 uint32_t ingressInterface; /* 8- 11 */
1024 uint32_t egressInterface; /* 12- 15 */
1025
1026 /* may also hold packetTotalCount and octetTotalCount,
1027 * initiatorPackets and initiatorOctets */
1028 uint64_t packetDeltaCount; /* 16- 23 */
1029 uint64_t octetDeltaCount; /* 24- 31 */
1030
1031 /* postPacketDeltaCount and postOctetDeltaCount; or
1032 * postPacketTotalCount and postPacketTotalCount; or
1033 * responderPackets and responderOctets when the NF9REC_INITIATOR
1034 * bit is set */
1035 uint64_t postPacketDeltaCount; /* 32- 39 */
1036 uint64_t postOctetDeltaCount; /* 40- 47 */
1037
1038 union nf9rec_time_un {
1039 /* Traditional NetFlow time uses SysUptime */
1040 struct nf9rec_time_sysup_st {
1041 uint64_t systemInitTimeMilliseconds; /* 48- 55 */
1042 uint32_t flowStartSysUpTime; /* 56- 59 */
1043 uint32_t flowEndSysUpTime; /* 60- 63 */
1044 } sysup;
1045 struct nf9rec_time_milli_st {
1046 uint64_t flowStartMilliseconds; /* 48- 55 */
1047 uint64_t flowEndMilliseconds; /* 56- 63 */
1048 } milli;
1049 } t;
1050
1051 /* vlan IDs (reverse values below) */
1052 uint16_t vlanId; /* 64- 65 */
1053 uint16_t postVlanId; /* 66- 67 */
1054
1055 /* ICMP, may be icmpTypeCodeIPv4 or icmpTypeCodeIPv6 */
1056 uint16_t icmpTypeCode; /* 68- 69 */
1057 /* ICMP, may be icmpTypeIPv4 or icmpTypeIPv6 */
1058 uint8_t icmpType; /* 70 */
1059 /* ICMP, may be icmpCodeIPv4 or icmpCodeIPv6 */
1060 uint8_t icmpCode; /* 71 */
1061
1062 union nf9rec_addr_un {
1063 struct nf9rec_ip4_st {
1064 uint32_t sourceIPv4Address; /* 72- 75 */
1065 uint32_t destinationIPv4Address; /* 76- 79 */
1066 uint32_t ipNextHopIPv4Address; /* 80- 83 */
1067 } ip4;
1068 struct nf9rec_ip6_st {
1069 uint8_t sourceIPv6Address[16]; /* 72- 87 */
1070 uint8_t destinationIPv6Address[16]; /* 88-103 */
1071 uint8_t ipNextHopIPv6Address[16]; /* 104-119 */
1072 } ip6;
1073 } addr;
1074
1075 uint32_t paddingOctets; /* 120-123 */
1076 /* Firewall events */
1077 uint8_t firewallEvent; /* 124 */
1078 uint8_t NF_F_FW_EVENT; /* 125 */
1079 uint16_t NF_F_FW_EXT_EVENT; /* 126-127 */
1080 } ski_nf9rec_t;
1081
1082
1083 /*
1084 * ********** Tombstone Record Options Template **********
1085 *
1086 * Define the list of information elements and the corresponding
1087 * structs for reading YAF Options Template records that contain a
1088 * tombstone counter. The templates and structs are used by
1089 * ski_tombstone_next() when reading data.
1090 *
1091 * The type for these records is SKI_RECTYPE_TOMBSTONE
1092 *
1093 * The records include a subTemplateList represented by
1094 * ski_tombstone_access_spec[], ski_tombstone_access_t, and
1095 * SKI_TOMBSTONE_ACCESS_TID.
1096 */
1097
1098 #define SKI_TOMBSTONE_TID 0xAFEE
1099
1100 /* the internal template id */
1101 #define SKI_TOMBSTONE_ACCESS_TID 0xAFE9
1102
1103 /* the external template id for the timestamp list */
1104 #define SKI_YAF_TOMBSTONE_ACCESS 0xD002
1105
1106 /* tombstoneId, exporterConfiguredId, exporterUniqueId, certToolId,
1107 * and tombstoneAccessList are CERT_PEN elements, IDs 550-554 */
1108 static fbInfoElementSpec_t ski_tombstone_spec[] = {
1109 { (char*)"observationDomainId", 4, 0 }, /* 149 */
1110 { (char*)"exportingProcessId", 4, 0 }, /* 144 */
1111 { (char*)"exporterConfiguredId", 2, 0 }, /* CERT_PEN, 551 */
1112 { (char*)"exporterUniqueId", 2, 0 }, /* CERT_PEN, 552 */
1113 { (char*)"paddingOctets", 4, 0 }, /* 210 */
1114 { (char*)"tombstoneId", 4, 0 }, /* CERT_PEN, 550 */
1115 { (char*)"observationTimeSeconds", 4, 0 }, /* 322 */
1116 #if SKIPFIX_ENABLE_TOMBSTONE_TIMES
1117 { (char*)"subTemplateList", 0, 0 }, /* 292 */
1118 #if FIXBUF_CHECK_VERSION(2,3,0)
1119 /* because fixbuf < 2.3.0 does not decode list-type elements
1120 * correctly, only use the element with fixbuf >= 2.3.0. */
1121 { (char*)"tombstoneAccessList", 0, 0 }, /* CERT_PEN, 554 */
1122 #endif
1123 #endif /* SKIPFIX_ENABLE_TOMBSTONE_TIMES */
1124 FB_IESPEC_NULL
1125 };
1126
1127 typedef struct ski_tombstone_st {
1128 uint32_t observationDomainId; /* 0 - 3 */
1129 uint32_t exportingProcessId; /* 4 - 7 */
1130 uint16_t exporterConfiguredId; /* 8 - 9 */
1131 uint16_t exporterUniqueId; /* 10 - 11 */
1132 uint32_t paddingOctets; /* 12 - 15 */
1133 uint32_t tombstoneId; /* 16 - 19 */
1134 uint32_t observationTimeSeconds; /* 20 - 23 */
1135 #if SKIPFIX_ENABLE_TOMBSTONE_TIMES
1136 fbSubTemplateList_t stl; /* 24... */
1137 #if FIXBUF_CHECK_VERSION(2,3,0)
1138 fbSubTemplateList_t tombstoneAccessList; /* ... */
1139 #endif
1140 #endif /* SKIPFIX_ENABLE_TOMBSTONE_TIMES */
1141 } ski_tombstone_t;
1142
1143
1144 /* The template used by the subTemplateList */
1145 static fbInfoElementSpec_t ski_tombstone_access_spec[] = {
1146 { (char*)"certToolId", 4, 0 }, /* CERT_PEN, 553 */
1147 { (char*)"exportingProcessId", 4, 0 }, /* 144 */
1148 { (char*)"observationTimeSeconds", 4, 0 }, /* 322 */
1149 FB_IESPEC_NULL
1150 };
1151
1152 typedef struct ski_tombstone_access_st {
1153 uint32_t certToolId;
1154 uint32_t exportingProcessId;
1155 uint32_t observationTimeSeconds;
1156 } ski_tombstone_access_t;
1157
1158
1159
1160 /*
1161 * ********** NetFlowV9 Sampling Options Template **********
1162 *
1163 * Define the list of information elements and the corresponding
1164 * struct for reading NetFlowV9 Options Template records that
1165 * contain sampling information. The template and struct are used
1166 * by ski_nf9sampling_next() when reading data.
1167 *
1168 * The type for these records is SKI_RECTYPE_NF9SAMPLING
1169 */
1170
1171 #define SKI_NF9SAMPLING_TID 0xAFEF
1172
1173 #define SKI_NF9SAMPLING_PADDING 5
1174
1175 static fbInfoElementSpec_t ski_nf9sampling_spec[] = {
1176 { (char*)"samplingInterval", 4, 0 }, /* 34 */
1177
1178 { (char*)"flowSamplerRandomInterval", 4, 1 }, /* 50, current fixbuf */
1179 { (char*)"samplerRandomInterval", 4, 2 }, /* 50, future fixbuf */
1180
1181 { (char*)"samplingAlgorithm", 1, 0 }, /* 35 */
1182
1183 { (char*)"flowSamplerMode", 1, 1 }, /* 49, current fixbuf */
1184 { (char*)"samplerMode", 1, 2 }, /* 49, future fixbuf */
1185
1186 { (char*)"flowSamplerID", 1, 1 }, /* 48, current fixbuf */
1187 { (char*)"samplerId", 1, 2 }, /* 48, future fixbuf */
1188
1189 #if SKI_NF9SAMPLING_PADDING != 0
1190 { (char*)"paddingOctets", SKI_NF9SAMPLING_PADDING, 0 },
1191 #endif
1192 FB_IESPEC_NULL
1193 };
1194
1195 typedef struct ski_nf9sampling_st {
1196 uint32_t samplingInterval;
1197 uint32_t samplerRandomInterval;
1198 uint8_t samplingAlgorithm;
1199 uint8_t samplerMode;
1200 uint8_t samplerId;
1201 #if SKI_NF9SAMPLING_PADDING != 0
1202 uint8_t paddingOctets[SKI_NF9SAMPLING_PADDING];
1203 #endif
1204 } ski_nf9sampling_t;
1205
1206
1207 /*
1208 * ********** Simple Template for Ignoring Data **********
1209 *
1210 * Simple template for reading data that is thrown away. The
1211 * template and struct are used by ski_ignore_next() when reading
1212 * data.
1213 *
1214 * The type for these records is SKI_RECTYPE_IGNORE
1215 */
1216
1217 #define SKI_IGNORE_TID 0x4444
1218
1219 static fbInfoElementSpec_t ski_ignore_spec[] = {
1220 { (char*)"systemInitTimeMilliseconds", 8, 0 },
1221 FB_IESPEC_NULL
1222 };
1223
1224 typedef struct ski_ignore_st {
1225 uint64_t systemInitTimeMilliseconds;
1226 } ski_ignore_t;
1227
1228
1229 /*
1230 * ********** Union Across All Record Types **********
1231 */
1232
1233 /* Types of IPFIX records. Returned by ski_rectype_next(). */
1234 typedef enum ski_rectype_en {
1235 SKI_RECTYPE_ERROR,
1236 SKI_RECTYPE_FIXREC,
1237 SKI_RECTYPE_YAFREC,
1238 SKI_RECTYPE_NF9REC,
1239 SKI_RECTYPE_YAFSTATS,
1240 SKI_RECTYPE_TOMBSTONE,
1241 SKI_RECTYPE_NF9SAMPLING,
1242 SKI_RECTYPE_IGNORE
1243 } ski_rectype_t;
1244
1245 #if TRACEMSG_LEVEL >= 2
1246 static const char *ski_rectype_name[] = {
1247 "SKI_RECTYPE_ERROR",
1248 "SKI_RECTYPE_FIXREC",
1249 "SKI_RECTYPE_YAFREC",
1250 "SKI_RECTYPE_NF9REC",
1251 "SKI_RECTYPE_YAFSTATS",
1252 "SKI_RECTYPE_TOMBSTONE",
1253 "SKI_RECTYPE_NF9SAMPLING",
1254 "SKI_RECTYPE_IGNORE"
1255 };
1256 #endif
1257
1258 struct ski_record_st {
1259 /* Template used to read this record */
1260 fbTemplate_t *tmpl;
1261 /* The bitmap value that is tmpl's context */
1262 BMAP_TYPE bmap;
1263 /* The ID of tmpl */
1264 uint16_t tid;
1265 /* The record type that is decided from the bitmap */
1266 ski_rectype_t rectype;
1267 /* A pointer to the forward rwRec to be filled */
1268 rwRec *fwd_rec;
1269 /* A pointer to the reverse rwRec to be filled */
1270 rwRec *rev_rec;
1271 /* The IPFIX record */
1272 union data_un {
1273 ski_fixrec_t fixrec;
1274 ski_yafrec_t yafrec;
1275 ski_nf9rec_t nf9rec;
1276 ski_yafstats_t yafstats;
1277 ski_tombstone_t tombstone;
1278 ski_nf9sampling_t nf9sampling;
1279 ski_ignore_t ignore;
1280 } data;
1281 };
1282 typedef struct ski_record_st ski_record_t;
1283
1284
1285
1286 /*
1287 * ********** FUNCTION DEFINITIONS **********
1288 */
1289
1290 /* Create macros to assist in creating TRACEMSG()s. To avoid a C89
1291 * compiler warning, put DEFINE_PREFIX_BUF() after other variable
1292 * definitions but before any other code. */
1293 #if TRACEMSG_LEVEL < 2
1294 #define DEFINE_PREFIX_BUF(varname)
1295 #define makeTracemsgPrefix(_a, _b, _c, _d, _e, _f)
1296 #else
1297 #define DEFINE_PREFIX_BUF(varname) char varname [512]
1298 /*
1299 * Format the probe name, template id, domain, and template pointer
1300 * in a string buffer.
1301 */
1302 static char *
makeTracemsgPrefix(char * buf,size_t buflen,const char * probe_name,uint32_t domain,uint16_t tid,const fbTemplate_t * tmpl)1303 makeTracemsgPrefix(
1304 char *buf,
1305 size_t buflen,
1306 const char *probe_name,
1307 uint32_t domain,
1308 uint16_t tid,
1309 const fbTemplate_t *tmpl)
1310 {
1311 snprintf(buf, buflen, "'%s': Template %#06x, domain %#x, [%p]",
1312 probe_name, tid, domain, (void *)tmpl);
1313 return buf;
1314 }
1315 #endif /* TRACEMSG_LEVEL */
1316
1317 /*
1318 * The skiTemplateCallbackCtx() callback is invoked whenever the
1319 * session receives a new template. This function must have the
1320 * signature defined by a typedef defined by libfixbuf.
1321 *
1322 * In fixbuf 2.x, the callback is set by calling
1323 * fbSessionAddNewTemplateCallback() and its signature is given by
1324 * 'fbNewTemplateCallback_fn'. In fixbuf 1.x, it is set by calling
1325 * fbSessionAddTemplateCtxCallback2() and its signature is
1326 * 'fbTemplateCtxCallback2_fn'.
1327 *
1328 * One purpose of the callback is the tell fixbuf how to process
1329 * items in a subTemplateMultiList. We tell fixbuf to map from
1330 * the two templates that yaf uses for TCP flags (one of which has
1331 * reverse elements and one of which does not) to the struct used
1332 * in this file.
1333 *
1334 * The callback also examines the template and sets a context
1335 * pointer that contains high bits for certain information
1336 * elements. See the detailed comment above the "struct elem_st"
1337 * definition.
1338 *
1339 * Finally, if the probe has the SOURCE_LOG_TEMPLATES flag set or
1340 * the global `show_templates` is true (the environment variable
1341 * named by SKI_ENV_PRINT_TEMPLATES controls this), the templates
1342 * are printed to the log file. (`show_templates` is defined in
1343 * probeconf.c.)
1344 */
1345 static void
skiTemplateCallbackCtx(fbSession_t * session,uint16_t tid,fbTemplate_t * tmpl,void * app_ctx,void ** ctx,fbTemplateCtxFree_fn * ctx_free_fn)1346 skiTemplateCallbackCtx(
1347 fbSession_t *session,
1348 uint16_t tid,
1349 fbTemplate_t *tmpl,
1350 void *app_ctx,
1351 void **ctx,
1352 #if FIXBUF_CHECK_VERSION(2,0,0)
1353 fbTemplateCtxFree_fn
1354 #else
1355 fbTemplateCtxFree2_fn
1356 #endif
1357 *ctx_free_fn)
1358 {
1359 #define TMPL_PROC_MSG2(is_option, wp2_tmpl_name) \
1360 DEBUGMSG(("'%s': Processing " is_option "template %#06x (%u)," \
1361 " domain %#x, with the %s template"), \
1362 name, tid, tid, domain, (wp2_tmpl_name))
1363
1364 #define TMPL_PROC_MSG(wp_name) TMPL_PROC_MSG2("", wp_name)
1365 #define TMPL_PROC_MSG_OPT(wp_name) TMPL_PROC_MSG2("options ", wp_name)
1366
1367 fbCollector_t *coll;
1368 const fbInfoElement_t *ie;
1369 const char *name;
1370 int show_tmpl;
1371 BMAP_TYPE out;
1372 uint64_t bmap;
1373 uint32_t domain;
1374 uint32_t count;
1375 uint32_t scope;
1376 uint32_t i;
1377 int known_id;
1378 DEFINE_PREFIX_BUF(prefix);
1379
1380 TRACE_ENTRY;
1381 SK_UNUSED_PARAM(app_ctx);
1382
1383 *ctx = NULL;
1384 *ctx_free_fn = NULL;
1385
1386 domain = fbSessionGetDomain(session);
1387 count = fbTemplateCountElements(tmpl);
1388 scope = fbTemplateGetOptionsScope(tmpl);
1389 bmap = 0;
1390 out = 0;
1391
1392 coll = fbSessionGetCollector(session);
1393 if (NULL == coll) {
1394 name = "<udp>";
1395 show_tmpl = show_templates;
1396 } else {
1397 const skIPFIXConnection_t *conn;
1398 const skIPFIXSource_t *source;
1399 conn = (skIPFIXConnection_t *)fbCollectorGetContext(coll);
1400 source = conn->source;
1401 name = source->name;
1402 show_tmpl = skpcProbeGetLogFlags(source->probe) & SOURCE_LOG_TEMPLATES;
1403 }
1404
1405 makeTracemsgPrefix(prefix, sizeof(prefix), name, domain, tid,tmpl);
1406 TRACEMSG(2, ("%s skiTemplateCallbackCtx()", prefix));
1407
1408 if (scope) {
1409 unsigned int samplingAlgorithm;
1410 unsigned int samplerMode;
1411
1412 /* do not define any template pairs for this template */
1413 fbSessionAddTemplatePair(session, tid, 0);
1414
1415 /* the sampling check requires multiple elements */
1416 samplingAlgorithm = samplerMode = 0;
1417
1418 for (i = 0; i < count && (ie = fbTemplateGetIndexedIE(tmpl, i)); ++i) {
1419 if (ie->ent == 0) {
1420 switch (ie->num) {
1421 case 34:
1422 case 35:
1423 /* verify that both samplingInterval and
1424 * samplingAlgorithm are present */
1425 ++samplingAlgorithm;
1426 if (2 == samplingAlgorithm) {
1427 bmap |= (1 | TMPL_BIT_samplingAlgorithm);
1428 }
1429 break;
1430 case 49:
1431 case 50:
1432 /* verify that both samplerMode and
1433 * samplerRandomInterval are present */
1434 ++samplerMode;
1435 if (2 == samplerMode) {
1436 bmap |= (1 | TMPL_BIT_samplerMode);
1437 }
1438 break;
1439 }
1440 } else if (ie->ent == IPFIX_CERT_PEN) {
1441 /* CERT PRIVATE ENTERPRISE ELEMENTS */
1442 switch (ie->num) {
1443 case 104:
1444 ASSERT_IE_NAME_IS(ie, flowTableFlushEventCount);
1445 bmap |= (1 | TMPL_BIT_flowTableFlushEventCount);
1446 break;
1447 case 105:
1448 ASSERT_IE_NAME_IS(ie, flowTablePeakCount);
1449 bmap |= (1 | TMPL_BIT_flowTablePeakCount);
1450 break;
1451 case 550:
1452 ASSERT_IE_NAME_IS(ie, tombstoneId);
1453 bmap |= (1 | TMPL_BIT_tombstoneId);
1454 break;
1455 }
1456 }
1457 TRACEMSG(
1458 3, ("%s bmap %#012" PRIx64 ", IE %s (%u/%u)",
1459 prefix, bmap, ie->ref.canon->ref.name, ie->ent, ie->num));
1460 }
1461 if (bmap) {
1462 out = (BMAP_TYPE)bmap;
1463 BMAP_TMPL_CTX_SET(ctx, ctx_free_fn, out);
1464 }
1465 TMPL_PROC_MSG_OPT(((bmap & (TMPL_BIT_flowTableFlushEventCount
1466 | TMPL_BIT_flowTablePeakCount))
1467 ? "YAFstats"
1468 : ((bmap & (TMPL_BIT_tombstoneId))
1469 ? "tombstone"
1470 : ((bmap & (TMPL_BIT_samplingAlgorithm
1471 | TMPL_BIT_samplerMode))
1472 ? "sampling"
1473 : "ignore"))));
1474
1475 } else {
1476 /* populate the bitmap */
1477 for (i = 0; i < count && (ie = fbTemplateGetIndexedIE(tmpl, i)); ++i) {
1478 if (ie->ent == 0) {
1479 /* STANDARD ELEMENT */
1480 switch (ie->num) {
1481 case 8:
1482 case 12:
1483 /* sourceIPv4Address and/or destinationIPv4Address */
1484 bmap |= TMPL_BIT_sourceIPv4Address;
1485 break;
1486 case 27:
1487 case 28:
1488 /* sourceIPv6Address and/or destinationIPv6Address */
1489 bmap |= TMPL_BIT_sourceIPv6Address;
1490 break;
1491 case 1:
1492 /* octetDeltaCount */
1493 bmap |= TMPL_BIT_octetDeltaCount;
1494 break;
1495 case 2:
1496 /* packetDeltaCount */
1497 bmap |= TMPL_BIT_packetDeltaCount;
1498 break;
1499 case 85:
1500 /* octetTotalCount */
1501 bmap |= TMPL_BIT_octetTotalCount;
1502 break;
1503 case 86:
1504 /* packetTotalCount */
1505 bmap |= TMPL_BIT_packetTotalCount;
1506 break;
1507 case 23:
1508 case 24:
1509 /* postOctetDeltaCount and/or postPacketDeltaCount */
1510 bmap |= TMPL_BIT_postOctetDeltaCount;
1511 break;
1512 case 171:
1513 case 172:
1514 /* postOctetTotalCount and/or postPacketTotalCount */
1515 bmap |= TMPL_BIT_postOctetTotalCount;
1516 break;
1517 case 32:
1518 case 139:
1519 /* icmpTypeCodeIPv4 and/or icmpTypeCodeIPv6 */
1520 bmap |= TMPL_BIT_icmpTypeCodeIPv4;
1521 break;
1522 case 176:
1523 case 177:
1524 case 178:
1525 case 179:
1526 /* icmpTypeIPv4, icmpCodeIPv4, icmpTypeIPv6, and
1527 * icmpCodeIPv6 all map to same position */
1528 bmap |= TMPL_BIT_icmpTypeIPv4;
1529 break;
1530 case 231:
1531 case 298:
1532 /* initiatorOctets and/or initiatorPackets */
1533 bmap |= TMPL_BIT_initiatorOctets;
1534 break;
1535 case 232:
1536 case 299:
1537 /* responderOctets and/or responderPackets */
1538 bmap |= TMPL_BIT_responderOctets;
1539 break;
1540
1541 case 22:
1542 ASSERT_IE_NAME_IS(ie, flowStartSysUpTime);
1543 bmap |= TMPL_BIT_flowStartSysUpTime;
1544 break;
1545 case 59:
1546 ASSERT_IE_NAME_IS(ie, postVlanId);
1547 bmap |= TMPL_BIT_postVlanId;
1548 break;
1549 case 144:
1550 ASSERT_IE_NAME_IS(ie, exportingProcessId);
1551 bmap |= TMPL_BIT_exportingProcessId;
1552 break;
1553 case 150:
1554 ASSERT_IE_NAME_IS(ie, flowStartSeconds);
1555 bmap |= TMPL_BIT_flowStartSeconds;
1556 break;
1557 case 151:
1558 ASSERT_IE_NAME_IS(ie, flowEndSeconds);
1559 bmap |= TMPL_BIT_flowEndSeconds;
1560 break;
1561 case 152:
1562 ASSERT_IE_NAME_IS(ie, flowStartMilliseconds);
1563 bmap |= TMPL_BIT_flowStartMilliseconds;
1564 break;
1565 case 153:
1566 ASSERT_IE_NAME_IS(ie, flowEndMilliseconds);
1567 bmap |= TMPL_BIT_flowEndMilliseconds;
1568 break;
1569 case 154:
1570 ASSERT_IE_NAME_IS(ie, flowStartMicroseconds);
1571 bmap |= TMPL_BIT_flowStartMicroseconds;
1572 break;
1573 case 155:
1574 ASSERT_IE_NAME_IS(ie, flowEndMicroseconds);
1575 bmap |= TMPL_BIT_flowEndMicroseconds;
1576 break;
1577 case 156:
1578 ASSERT_IE_NAME_IS(ie, flowStartNanoseconds);
1579 bmap |= TMPL_BIT_flowStartNanoseconds;
1580 break;
1581 case 157:
1582 ASSERT_IE_NAME_IS(ie, flowEndNanoseconds);
1583 bmap |= TMPL_BIT_flowEndNanoseconds;
1584 break;
1585 case 158:
1586 ASSERT_IE_NAME_IS(ie, flowStartDeltaMicroseconds);
1587 bmap |= TMPL_BIT_flowStartDeltaMicroseconds;
1588 break;
1589 case 159:
1590 ASSERT_IE_NAME_IS(ie, flowEndDeltaMicroseconds);
1591 bmap |= TMPL_BIT_flowEndDeltaMicroseconds;
1592 break;
1593 case 160:
1594 ASSERT_IE_NAME_IS(ie, systemInitTimeMilliseconds);
1595 bmap |= TMPL_BIT_systemInitTimeMilliseconds;
1596 break;
1597 case 161:
1598 ASSERT_IE_NAME_IS(ie, flowDurationMilliseconds);
1599 bmap |= TMPL_BIT_flowDurationMilliseconds;
1600 break;
1601 case 162:
1602 ASSERT_IE_NAME_IS(ie, flowDurationMicroseconds);
1603 bmap |= TMPL_BIT_flowDurationMicroseconds;
1604 break;
1605 case 233:
1606 ASSERT_IE_NAME_IS(ie, firewallEvent);
1607 bmap |= TMPL_BIT_firewallEvent;
1608 break;
1609 case 258:
1610 ASSERT_IE_NAME_IS(ie, collectionTimeMilliseconds);
1611 bmap |= TMPL_BIT_collectionTimeMilliseconds;
1612 break;
1613 case 293:
1614 ASSERT_IE_NAME_IS(ie, subTemplateMultiList);
1615 bmap |= TMPL_BIT_subTemplateMultiList;
1616 break;
1617 case 322:
1618 ASSERT_IE_NAME_IS(ie, observationTimeSeconds);
1619 bmap |= TMPL_BIT_observationTimeSeconds;
1620 break;
1621 case 323:
1622 ASSERT_IE_NAME_IS(ie, observationTimeMilliseconds);
1623 bmap |= TMPL_BIT_observationTimeMilliseconds;
1624 break;
1625 case 324:
1626 ASSERT_IE_NAME_IS(ie, observationTimeMicroseconds);
1627 bmap |= TMPL_BIT_observationTimeMicroseconds;
1628 break;
1629 case 325:
1630 ASSERT_IE_NAME_IS(ie, observationTimeNanoseconds);
1631 bmap |= TMPL_BIT_observationTimeNanoseconds;
1632 break;
1633
1634 case FB_CISCO_ASA_EVENT_XTRA:
1635 ASSERT_IE_NAME_IS(ie, NF_F_FW_EXT_EVENT);
1636 bmap |= TMPL_BIT_NF_F_FW_EXT_EVENT;
1637 break;
1638 case FB_CISCO_ASA_EVENT_ID:
1639 ASSERT_IE_NAME_IS(ie, NF_F_FW_EVENT);
1640 bmap |= TMPL_BIT_NF_F_FW_EVENT;
1641 break;
1642 }
1643 } else if (ie->ent == FB_IE_PEN_REVERSE) {
1644 /* REVERSE VALUE OF STANDARD ELEMENTS */
1645 switch (ie->num) {
1646 case 1:
1647 case 2:
1648 /* reverseOctetDeltaCount and/or
1649 * reversePacketDeltaCount; for NetFlow v9 records
1650 * they may hold post{Octet,Packet}DeltaCount when
1651 * using libfixbuf prior to 1.8.0 */
1652 bmap |= TMPL_BIT_reverseOctetDeltaCount;
1653 break;
1654 case 6:
1655 ASSERT_IE_NAME_IS(ie, reverseTcpControlBits);
1656 bmap |= TMPL_BIT_reverseTcpControlBits;
1657 break;
1658 case 58:
1659 ASSERT_IE_NAME_IS(ie, reverseVlanId);
1660 bmap |= TMPL_BIT_reverseVlanId;
1661 break;
1662 }
1663 } else if (ie->ent == IPFIX_CERT_PEN) {
1664 /* CERT PRIVATE ENTERPRISE ELEMENTS */
1665 switch (ie->num) {
1666 case 14:
1667 ASSERT_IE_NAME_IS(ie, initialTCPFlags);
1668 bmap |= TMPL_BIT_initialTCPFlags;
1669 break;
1670 case 14 | FB_IE_VENDOR_BIT_REVERSE:
1671 ASSERT_IE_NAME_IS(ie, reverseInitialTCPFlags);
1672 bmap |= TMPL_BIT_reverseInitialTCPFlags;
1673 break;
1674 case 21:
1675 ASSERT_IE_NAME_IS(ie, reverseFlowDeltaMilliseconds);
1676 bmap |= TMPL_BIT_reverseFlowDeltaMilliseconds;
1677 break;
1678 case 553:
1679 ASSERT_IE_NAME_IS(ie, certToolId);
1680 bmap |= TMPL_BIT_certToolId;
1681 break;
1682 }
1683 }
1684 TRACEMSG(
1685 3, ("%s bmap %#012" PRIx64 ", IE %s (%u/%u)",
1686 prefix, bmap, ie->ref.canon->ref.name, ie->ent, ie->num));
1687 }
1688
1689 /* now that the bitmap is populated, see if it matches some
1690 * expected patterns */
1691
1692 /* tell fixbuf how to transcode templates that appear in lists */
1693 if (bmap == TMPL_BIT_initialTCPFlags
1694 || bmap == (TMPL_BIT_initialTCPFlags
1695 | TMPL_BIT_reverseInitialTCPFlags))
1696 {
1697 /* the template ID matches the ID for the YAF template
1698 * that contains TCP flags */
1699 fbSessionAddTemplatePair(session, tid, SKI_TCP_STML_TID);
1700 TMPL_PROC_MSG("YAF TCP flags list");
1701 known_id = 1;
1702 #if SKIPFIX_ENABLE_TOMBSTONE_TIMES
1703 } else if ((bmap & TMPL_BIT_certToolId)
1704 || ((bmap == (TMPL_BIT_exportingProcessId
1705 | TMPL_BIT_observationTimeSeconds)
1706 && count == 2)))
1707 {
1708 /* the template ID matches the ID for the template that
1709 * contains tombstone timestamps */
1710 fbSessionAddTemplatePair(session, tid, SKI_TOMBSTONE_ACCESS_TID);
1711 TMPL_PROC_MSG("tombstone access");
1712 known_id = 1;
1713 #endif /* SKIPFIX_ENABLE_TOMBSTONE_TIMES */
1714 } else {
1715 /* do not define any template pairs for this template */
1716 fbSessionAddTemplatePair(session, tid, 0);
1717 known_id = 0;
1718 /* clear the exportingProcessId bit */
1719 bmap &= ~TMPL_BIT_exportingProcessId;
1720 }
1721
1722 /* check whether the template may be processed by the YAF
1723 * template by: not using any IEs outside of those defined the
1724 * YAF template, by having IP addresses, by using millisecond
1725 * times, and by having consistent IEs for volume */
1726 if (known_id) {
1727 /* no more tests are needed */
1728 } else if (0 == (bmap & ~TMPL_MASK_YAFREC)
1729 && (bmap & TMPL_MASK_IPADDRESS)
1730 && (bmap & TMPL_MASK_TIME_MILLI_YAF)
1731 && (((bmap & TMPL_MASK_VOLUME_YAF)
1732 == (TMPL_BIT_octetDeltaCount | TMPL_BIT_packetDeltaCount))
1733 || ((bmap & TMPL_MASK_VOLUME_YAF)
1734 ==(TMPL_BIT_octetTotalCount | TMPL_BIT_packetTotalCount))))
1735 {
1736 /* Which volume element is present? */
1737 if ((bmap & TMPL_MASK_VOLUME_YAF)
1738 == (TMPL_BIT_octetDeltaCount | TMPL_BIT_packetDeltaCount))
1739 {
1740 out |= YAFREC_DELTA;
1741 } else {
1742 assert((TMPL_BIT_octetTotalCount | TMPL_BIT_packetTotalCount)
1743 == (bmap & TMPL_MASK_VOLUME_YAF));
1744 out |= YAFREC_TOTAL;
1745 }
1746 /* Which IP addresses are present? */
1747 if ((bmap & TMPL_MASK_IPADDRESS) == TMPL_MASK_IPADDRESS) {
1748 /* Both are */
1749 out |= YAFREC_IP_BOTH;
1750 } else if (bmap & TMPL_BIT_sourceIPv6Address) {
1751 out |= YAFREC_ONLY_IP6;
1752 } else {
1753 assert(bmap & TMPL_BIT_sourceIPv4Address);
1754 out |= YAFREC_ONLY_IP4;
1755 }
1756
1757 /* Are TCP flags available without visiting the STML? */
1758 if ((0 == (bmap & TMPL_BIT_initialTCPFlags))
1759 && (bmap & TMPL_BIT_subTemplateMultiList))
1760 {
1761 out |= YAFREC_STML;
1762 }
1763 /* Is it a uniflow or a bi flow? */
1764 if (bmap & TMPL_BIT_reverseFlowDeltaMilliseconds) {
1765 out |= YAFREC_BI;
1766 } else {
1767 out |= YAFREC_UNI;
1768 }
1769 out |= (SKI_YAFREC_TID
1770 | (bmap & (TMPL_BIT_reverseVlanId |
1771 TMPL_BIT_reverseTcpControlBits |
1772 TMPL_BIT_reverseInitialTCPFlags |
1773 TMPL_BIT_icmpTypeCodeIPv4)));
1774 BMAP_TMPL_CTX_SET(ctx, ctx_free_fn, out);
1775 TMPL_PROC_MSG("YAF");
1776
1777 /* check whether the template may be processed by the NetFlow
1778 * v9 template by not having any IEs outside of that set */
1779 } else if ((0 == (bmap & ~TMPL_MASK_NF9REC))
1780 && (bmap & TMPL_MASK_IPADDRESS))
1781 {
1782 /* this do{}while(0) is not a loop, it is just something that
1783 * "break;" works with */
1784 do {
1785 /* Which IP addresses are present? */
1786 if ((bmap & TMPL_MASK_IPADDRESS)==TMPL_BIT_sourceIPv6Address){
1787 out |= NF9REC_IP6;
1788 } else if ((bmap & TMPL_MASK_IPADDRESS)
1789 == TMPL_BIT_sourceIPv4Address)
1790 {
1791 out |= NF9REC_IP4;
1792 } else {
1793 /* cannot use the nf9rec template */
1794 out = 0;
1795 break;
1796 }
1797
1798 /* Which time is present */
1799 if ((bmap & TMPL_MASK_TIME_NF9)== TMPL_MASK_TIME_SYSUP) {
1800 out |= NF9REC_SYSUP;
1801 } else if (((bmap & TMPL_MASK_TIME_NF9)
1802 == TMPL_MASK_TIME_MILLI_NF9)
1803 || ((bmap & TMPL_MASK_TIME_NF9)
1804 == TMPL_BIT_observationTimeMilliseconds))
1805 {
1806 out |= NF9REC_MILLI;
1807 } else {
1808 /* cannot use the nf9rec template */
1809 out = 0;
1810 break;
1811 }
1812
1813 /* Which volume is present */
1814 if (((bmap & TMPL_MASK_VOLUME_NF9)
1815 & (TMPL_BIT_initiatorOctets
1816 | TMPL_BIT_responderOctets))
1817 && 0 == ((bmap & TMPL_MASK_VOLUME_NF9)
1818 & ~(TMPL_BIT_initiatorOctets
1819 | TMPL_BIT_responderOctets)))
1820 {
1821 out |= NF9REC_INITIATOR;
1822 } else if (((bmap & TMPL_MASK_VOLUME_NF9)
1823 & (TMPL_BIT_octetDeltaCount
1824 | TMPL_BIT_packetDeltaCount
1825 | TMPL_BIT_postOctetDeltaCount))
1826 && 0 == ((bmap & TMPL_MASK_VOLUME_NF9)
1827 & ~(TMPL_BIT_octetDeltaCount
1828 | TMPL_BIT_packetDeltaCount
1829 | TMPL_BIT_postOctetDeltaCount)))
1830 {
1831 out |= NF9REC_DELTA;
1832 } else if (((bmap & TMPL_MASK_VOLUME_NF9)
1833 & (TMPL_BIT_octetTotalCount
1834 | TMPL_BIT_packetTotalCount
1835 | TMPL_BIT_postOctetTotalCount))
1836 && 0 == ((bmap & TMPL_MASK_VOLUME_NF9)
1837 & ~(TMPL_BIT_octetTotalCount
1838 | TMPL_BIT_packetTotalCount
1839 | TMPL_BIT_postOctetTotalCount)))
1840 {
1841 out |= NF9REC_TOTAL;
1842 } else if (((bmap & TMPL_MASK_VOLUME_NF9) == 0)
1843 && (bmap & (TMPL_BIT_firewallEvent
1844 | TMPL_BIT_NF_F_FW_EVENT)))
1845 {
1846 out |= NF9REC_INITIATOR;
1847 } else {
1848 /* cannot use the nf9rec template */
1849 out = 0;
1850 break;
1851 }
1852
1853 out |= (SKI_NF9REC_TID
1854 | (bmap & (TMPL_BIT_icmpTypeCodeIPv4 |
1855 TMPL_BIT_icmpTypeIPv4 |
1856 TMPL_BIT_firewallEvent |
1857 TMPL_BIT_NF_F_FW_EVENT |
1858 TMPL_BIT_NF_F_FW_EXT_EVENT)));
1859 BMAP_TMPL_CTX_SET(ctx, ctx_free_fn, out);
1860 TMPL_PROC_MSG("NFv9");
1861 } while (0);
1862 }
1863
1864 if (*ctx != NULL || known_id != 0) {
1865 /* template is already handled */
1866 } else if (bmap) {
1867 out = 1 | (BMAP_TYPE)bmap;
1868 BMAP_TMPL_CTX_SET(ctx, ctx_free_fn, out);
1869 TMPL_PROC_MSG("generic");
1870 } else {
1871 TMPL_PROC_MSG("ignore");
1872 }
1873 }
1874
1875 #if TRACEMSG_LEVEL >= 2
1876 if (*ctx) {
1877 TRACEMSG(2, ("%s bmap " BMAP_PRI ", written", prefix, out));
1878 }
1879 #endif
1880
1881 if (show_tmpl) {
1882 char buf[0x4000];
1883 char *b = buf;
1884 ssize_t rem = sizeof(buf);
1885 ssize_t sz;
1886
1887 *b = '\0';
1888 for (i = 0;
1889 i < count && (ie = fbTemplateGetIndexedIE(tmpl, i)) && rem > 2;
1890 ++i)
1891 {
1892 assert(b < (buf + sizeof(buf)));
1893 if (0 == ie->ent) {
1894 sz = snprintf(b, rem, "%s %s(%u)[%u]%s",
1895 ((i > 0) ? "," : ""),
1896 ie->ref.canon->ref.name, ie->num, ie->len,
1897 ((i < scope) ? "{scope}" : ""));
1898 } else {
1899 sz = snprintf(b, rem, "%s %s(%u/%u)[%u]%s",
1900 ((i > 0) ? "," : ""),
1901 ie->ref.canon->ref.name, ie->ent, ie->num,
1902 ie->len, ((i < scope) ? "{scope}" : ""));
1903 }
1904 rem -= sz;
1905 b += sz;
1906 }
1907 INFOMSG(("'%s': Contents of %stemplate %#06x (%u),"
1908 " domain %#x, %" PRIu32 " elements:%s"),
1909 name, ((scope) ? "options " : ""), tid, tid,
1910 domain, count, buf);
1911 }
1912 }
1913
1914
1915 /*
1916 * Initialize an fbSession object that reads from either the
1917 * network or from a file.
1918 *
1919 * This function updates the fbSession object with (1) the
1920 * received-new-template callback function and (2) all the
1921 * templates used when transcoding the incoming data.
1922 */
1923 int
skiSessionInitReader(fbSession_t * session,GError ** err)1924 skiSessionInitReader(
1925 fbSession_t *session,
1926 GError **err)
1927 {
1928 fbInfoModel_t *model;
1929 fbTemplate_t *tmpl;
1930 const uint32_t yafrec_spec_flags[] = {
1931 /* exported by YAF, either biflow or uniflow, either IPv4 or
1932 * IPv6 (never both), either delta count or total count, with
1933 * or without an STML */
1934 YAFREC_BI | YAFREC_ONLY_IP4 | YAFREC_DELTA | YAFREC_STML,
1935 YAFREC_BI | YAFREC_ONLY_IP4 | YAFREC_TOTAL | YAFREC_STML,
1936 YAFREC_BI | YAFREC_ONLY_IP6 | YAFREC_DELTA | YAFREC_STML,
1937 YAFREC_BI | YAFREC_ONLY_IP6 | YAFREC_TOTAL | YAFREC_STML,
1938
1939 YAFREC_UNI | YAFREC_ONLY_IP4 | YAFREC_DELTA | YAFREC_STML,
1940 YAFREC_UNI | YAFREC_ONLY_IP4 | YAFREC_TOTAL | YAFREC_STML,
1941 YAFREC_UNI | YAFREC_ONLY_IP6 | YAFREC_DELTA | YAFREC_STML,
1942 YAFREC_UNI | YAFREC_ONLY_IP6 | YAFREC_TOTAL | YAFREC_STML,
1943
1944 YAFREC_BI | YAFREC_ONLY_IP4 | YAFREC_DELTA,
1945 YAFREC_BI | YAFREC_ONLY_IP4 | YAFREC_TOTAL,
1946 YAFREC_BI | YAFREC_ONLY_IP6 | YAFREC_DELTA,
1947 YAFREC_BI | YAFREC_ONLY_IP6 | YAFREC_TOTAL,
1948
1949 YAFREC_UNI | YAFREC_ONLY_IP4 | YAFREC_DELTA,
1950 YAFREC_UNI | YAFREC_ONLY_IP4 | YAFREC_TOTAL,
1951 YAFREC_UNI | YAFREC_ONLY_IP6 | YAFREC_DELTA,
1952 YAFREC_UNI | YAFREC_ONLY_IP6 | YAFREC_TOTAL,
1953
1954 /* created by SiLK; always uniflow, never with an STML, always
1955 * delta count; may have both IPv4 and IPv6 */
1956 YAFREC_UNI | YAFREC_DELTA | YAFREC_IP_BOTH,
1957 0 /* sentinal */
1958 };
1959 const uint32_t nf9rec_spec_flags[] = {
1960 NF9REC_IP4 | NF9REC_SYSUP | NF9REC_DELTA,
1961 NF9REC_IP4 | NF9REC_MILLI | NF9REC_DELTA,
1962 NF9REC_IP6 | NF9REC_SYSUP | NF9REC_DELTA,
1963 NF9REC_IP6 | NF9REC_MILLI | NF9REC_DELTA,
1964
1965 NF9REC_IP4 | NF9REC_SYSUP | NF9REC_TOTAL,
1966 NF9REC_IP4 | NF9REC_MILLI | NF9REC_TOTAL,
1967 NF9REC_IP6 | NF9REC_SYSUP | NF9REC_TOTAL,
1968 NF9REC_IP6 | NF9REC_MILLI | NF9REC_TOTAL,
1969
1970 NF9REC_IP4 | NF9REC_SYSUP | NF9REC_INITIATOR,
1971 NF9REC_IP4 | NF9REC_MILLI | NF9REC_INITIATOR,
1972 NF9REC_IP6 | NF9REC_SYSUP | NF9REC_INITIATOR,
1973 NF9REC_IP6 | NF9REC_MILLI | NF9REC_INITIATOR,
1974
1975 0 /* sentinal */
1976 };
1977 uint32_t i;
1978 uint16_t tid;
1979
1980
1981 /* assert that we are not replacing an existing template */
1982 #ifdef NDEBUG
1983 #define ASSERT_NO_TMPL(m_session, m_tid, m_err)
1984 #else
1985 #define ASSERT_NO_TMPL(m_session, m_tid, m_err) \
1986 do { \
1987 assert(fbSessionGetTemplate(m_session, TRUE, (m_tid), m_err) \
1988 == NULL); \
1989 assert(g_error_matches(*(m_err), FB_ERROR_DOMAIN, FB_ERROR_TMPL)); \
1990 g_clear_error(m_err); \
1991 } while(0)
1992 #endif /* #else of #ifdef NDEBUG */
1993
1994 model = fbSessionGetInfoModel(session);
1995
1996 /* Add the "Give me everything" record template */
1997 tmpl = fbTemplateAlloc(model);
1998 if (!fbTemplateAppendSpecArray(tmpl, ski_fixrec_spec, sampler_flags, err)){
1999 goto ERROR;
2000 }
2001 ASSERT_NO_TMPL(session, SKI_FIXREC_TID, err);
2002 if (!fbSessionAddTemplate(session, TRUE, SKI_FIXREC_TID, tmpl, err)) {
2003 goto ERROR;
2004 }
2005
2006 /* Add the TCP record template */
2007 tmpl = fbTemplateAlloc(model);
2008 if (!fbTemplateAppendSpecArray(tmpl, ski_tcp_stml_spec, 0, err)) {
2009 goto ERROR;
2010 }
2011 ASSERT_NO_TMPL(session, SKI_TCP_STML_TID, err);
2012 if (!fbSessionAddTemplate(session, TRUE, SKI_TCP_STML_TID, tmpl, err)) {
2013 goto ERROR;
2014 }
2015
2016 /* Add the yaf stats record template */
2017 tmpl = fbTemplateAlloc(model);
2018 if (!fbTemplateAppendSpecArray(tmpl, ski_yafstats_spec, 0, err)) {
2019 goto ERROR;
2020 }
2021 ASSERT_NO_TMPL(session, SKI_YAFSTATS_TID, err);
2022 if (!fbSessionAddTemplate(session, TRUE, SKI_YAFSTATS_TID, tmpl, err)) {
2023 goto ERROR;
2024 }
2025
2026 /* Add the yaf tombstone record template */
2027 tmpl = fbTemplateAlloc(model);
2028 if (!fbTemplateAppendSpecArray(tmpl, ski_tombstone_spec, 0, err)) {
2029 goto ERROR;
2030 }
2031 ASSERT_NO_TMPL(session, SKI_TOMBSTONE_TID, err);
2032 if (!fbSessionAddTemplate(session, TRUE, SKI_TOMBSTONE_TID, tmpl, err)) {
2033 goto ERROR;
2034 }
2035
2036 /* Add the yaf tombstone record template */
2037 tmpl = fbTemplateAlloc(model);
2038 if (!fbTemplateAppendSpecArray(tmpl, ski_tombstone_access_spec, 0, err)) {
2039 goto ERROR;
2040 }
2041 ASSERT_NO_TMPL(session, SKI_TOMBSTONE_ACCESS_TID, err);
2042 if (!fbSessionAddTemplate(
2043 session, TRUE, SKI_TOMBSTONE_ACCESS_TID, tmpl, err)) {
2044 goto ERROR;
2045 }
2046
2047 /* Add the netflow v9 sampling options template */
2048 tmpl = fbTemplateAlloc(model);
2049 if (!fbTemplateAppendSpecArray(
2050 tmpl, ski_nf9sampling_spec, sampler_flags, err))
2051 {
2052 goto ERROR;
2053 }
2054 ASSERT_NO_TMPL(session, SKI_NF9SAMPLING_TID,err);
2055 if (!fbSessionAddTemplate(session, TRUE, SKI_NF9SAMPLING_TID, tmpl, err)){
2056 goto ERROR;
2057 }
2058
2059 /* Add the "do nothing/ignore record" template */
2060 tmpl = fbTemplateAlloc(model);
2061 if (!fbTemplateAppendSpecArray(tmpl, ski_ignore_spec, 0, err)) {
2062 goto ERROR;
2063 }
2064 ASSERT_NO_TMPL(session, SKI_IGNORE_TID, err);
2065 if (!fbSessionAddTemplate(session, TRUE, SKI_IGNORE_TID, tmpl, err)) {
2066 goto ERROR;
2067 }
2068
2069 /* Add the various ski_yafrec_spec templates */
2070 for (i = 0; yafrec_spec_flags[i] != 0; ++i) {
2071 tmpl = fbTemplateAlloc(model);
2072 if (!fbTemplateAppendSpecArray(
2073 tmpl, ski_yafrec_spec, yafrec_spec_flags[i], err))
2074 {
2075 goto ERROR;
2076 }
2077 tid = (SKI_YAFREC_TID | yafrec_spec_flags[i]);
2078 ASSERT_NO_TMPL(session, tid, err);
2079 if (!fbSessionAddTemplate(session, TRUE, tid, tmpl, err)) {
2080 goto ERROR;
2081 }
2082 }
2083
2084 /* Add the various ski_nf9rec_spec templates */
2085 for (i = 0; nf9rec_spec_flags[i] != 0; ++i) {
2086 tmpl = fbTemplateAlloc(model);
2087 if (!fbTemplateAppendSpecArray(
2088 tmpl, ski_nf9rec_spec, nf9rec_spec_flags[i], err))
2089 {
2090 goto ERROR;
2091 }
2092 tid = (SKI_NF9REC_TID | nf9rec_spec_flags[i]);
2093 ASSERT_NO_TMPL(session, tid, err);
2094 if (!fbSessionAddTemplate(session, TRUE, tid, tmpl, err)) {
2095 goto ERROR;
2096 }
2097 }
2098
2099 /* Invoke the function above when a new template arrives. */
2100 #if FIXBUF_CHECK_VERSION(2,0,0)
2101 fbSessionAddNewTemplateCallback(session, &skiTemplateCallbackCtx, NULL);
2102 #else
2103 fbSessionAddTemplateCtxCallback2(session, &skiTemplateCallbackCtx, NULL);
2104 #endif
2105
2106 TRACE_RETURN(1);
2107
2108 ERROR:
2109 fbTemplateFreeUnused(tmpl);
2110 TRACE_RETURN(0);
2111 }
2112
2113
2114 /* **************************************************************
2115 * ***** Support for reading/import
2116 */
2117
2118 /**
2119 * Use the external template of the next record to determine its
2120 * type.
2121 *
2122 * Fill the 'tmpl' member of 'record' with the incoming template
2123 * for the next record, fill the 'bmap' member of 'record' with the
2124 * bitmap settings for that template, fill the 'rectype' member of
2125 * 'record' with the next record's type, and return that type.
2126 */
2127 static ski_rectype_t
ski_rectype_next(fBuf_t * fbuf,ski_record_t * record,GError ** err)2128 ski_rectype_next(
2129 fBuf_t *fbuf,
2130 ski_record_t *record,
2131 GError **err)
2132 {
2133 record->tmpl = fBufNextCollectionTemplate(fbuf, &record->tid, err);
2134 if (record->tmpl == NULL) {
2135 return (record->rectype = SKI_RECTYPE_ERROR);
2136 }
2137 record->bmap = BMAP_TMPL_CTX_GET(record->tmpl);
2138
2139 /* Handle Records that use an Options Template */
2140 if (fbTemplateGetOptionsScope(record->tmpl)) {
2141 if (record->bmap & (TMPL_BIT_flowTableFlushEventCount
2142 | TMPL_BIT_flowTablePeakCount))
2143 {
2144 return (record->rectype = SKI_RECTYPE_YAFSTATS);
2145 }
2146 if (record->bmap & TMPL_BIT_tombstoneId) {
2147 return (record->rectype = SKI_RECTYPE_TOMBSTONE);
2148 }
2149 if (record->bmap
2150 & (TMPL_BIT_samplingAlgorithm | TMPL_BIT_samplerMode))
2151 {
2152 return (record->rectype = SKI_RECTYPE_NF9SAMPLING);
2153 }
2154 return (record->rectype = SKI_RECTYPE_IGNORE);
2155 }
2156
2157 switch (record->bmap & BMAP_RECTYPE_MASK) {
2158 case 4:
2159 return (record->rectype = SKI_RECTYPE_YAFREC);
2160 case 2: case 6:
2161 return (record->rectype = SKI_RECTYPE_NF9REC);
2162 case 1: case 3: case 5: case 7:
2163 return (record->rectype = SKI_RECTYPE_FIXREC);
2164 case 0:
2165 break;
2166 default:
2167 skAbortBadCase(record->bmap & BMAP_RECTYPE_MASK);
2168 }
2169 return (record->rectype = SKI_RECTYPE_IGNORE);
2170 }
2171
2172
2173 /**
2174 * Call fBufNext() and transcode the data into the
2175 * ski_yafstats_spec template. Return 1 on success or 0 on
2176 * failure.
2177 */
2178 static int
ski_yafstats_next(fBuf_t * fbuf,ski_record_t * record,const skpc_probe_t * probe,GError ** err)2179 ski_yafstats_next(
2180 fBuf_t *fbuf,
2181 ski_record_t *record,
2182 const skpc_probe_t *probe,
2183 GError **err)
2184 {
2185 size_t len;
2186 DEFINE_PREFIX_BUF(prefix);
2187
2188 SK_UNUSED_PARAM(probe);
2189 makeTracemsgPrefix(prefix, sizeof(prefix), skpcProbeGetName(probe),
2190 fbSessionGetDomain(fBufGetSession(fbuf)),
2191 record->tid, record->tmpl);
2192 TRACEMSG(2, (("%s bmap " BMAP_PRI ", read by ski_yafstats_next()"),
2193 prefix, record->bmap));
2194 assert(SKI_RECTYPE_YAFSTATS == record->rectype);
2195
2196 /* Set internal template to read a yaf stats record */
2197 if (!fBufSetInternalTemplate(fbuf, SKI_YAFSTATS_TID, err)) {
2198 return FALSE;
2199 }
2200
2201 len = sizeof(record->data.yafstats);
2202 return fBufNext(fbuf, (uint8_t *)&record->data.yafstats, &len, err);
2203 }
2204
2205
2206 /**
2207 * Update the statistics on 'source' by, for each field, adding to
2208 * 'source' the difference of the value the field in 'record' from
2209 * the value of the field in 'previous'. Finally, copy the values
2210 * from 'current' into 'previous'.
2211 */
2212 static void
ski_yafstats_update_source(skIPFIXSource_t * source,const ski_record_t * record,ski_yafstats_t * previous)2213 ski_yafstats_update_source(
2214 skIPFIXSource_t *source,
2215 const ski_record_t *record,
2216 ski_yafstats_t *previous)
2217 {
2218 const ski_yafstats_t *current;
2219
2220 assert(SKI_RECTYPE_YAFSTATS == record->rectype);
2221
2222 current = &record->data.yafstats;
2223
2224 DEBUGMSG("'%s': Got a yaf stats record", source->name);
2225 TRACEMSG(1, (("'%s': "
2226 "inittime %" PRIu64
2227 ", dropped %" PRIu64
2228 ", ignored %" PRIu64
2229 ", notsent %" PRIu64
2230 ", expired %" PRIu32
2231 ", pkttotal %" PRIu64
2232 ", exported %" PRIu64),
2233 source->name,
2234 current->systemInitTimeMilliseconds,
2235 current->droppedPacketTotalCount,
2236 current->ignoredPacketTotalCount,
2237 current->notSentPacketTotalCount,
2238 current->expiredFragmentCount,
2239 current->packetTotalCount,
2240 current->exportedFlowRecordTotalCount));
2241
2242 if (current->systemInitTimeMilliseconds
2243 != previous->systemInitTimeMilliseconds)
2244 {
2245 memset(previous, 0, sizeof(*previous));
2246 }
2247
2248 pthread_mutex_lock(&source->stats_mutex); \
2249 source->saw_yafstats_pkt = 1; \
2250 source->yaf_dropped_packets += (current->droppedPacketTotalCount
2251 - previous->droppedPacketTotalCount);
2252 source->yaf_ignored_packets += (current->ignoredPacketTotalCount
2253 - previous->ignoredPacketTotalCount);
2254 source->yaf_notsent_packets += (current->notSentPacketTotalCount
2255 - previous->notSentPacketTotalCount);
2256 source->yaf_expired_fragments += (current->expiredFragmentCount
2257 - previous->expiredFragmentCount);
2258 source->yaf_processed_packets += (current->packetTotalCount
2259 - previous->packetTotalCount);
2260 source->yaf_exported_flows += (current->exportedFlowRecordTotalCount
2261 - previous->exportedFlowRecordTotalCount);
2262 pthread_mutex_unlock(&source->stats_mutex);
2263 *previous = *current;
2264 }
2265
2266
2267 #if SKIPFIX_ENABLE_TOMBSTONE_TIMES
2268 /**
2269 * Add the access time 'seconds' for tool 'tool_id' to 'buffer'.
2270 */
2271 static ssize_t
ski_tombstone_add_access(char * buffer,size_t length,uint32_t seconds,uint32_t tool_id)2272 ski_tombstone_add_access(
2273 char *buffer,
2274 size_t length,
2275 uint32_t seconds,
2276 uint32_t tool_id)
2277 {
2278 static const char *tool[] = {
2279 "unknown(0)", "yaf", "super_mediator", "rwflowpack", "rwflowappend",
2280 "mothra-packer", "pipeline"
2281 };
2282 char stime_buf[SKTIMESTAMP_STRLEN];
2283
2284 sktimestamp_r(stime_buf, sktimeCreate(seconds, 0),
2285 SKTIMESTAMP_UTC | SKTIMESTAMP_NOMSEC);
2286 if (tool_id < sizeof(tool)/sizeof(tool[0])) {
2287 return snprintf(buffer, length, "; process: %s, time: %sZ",
2288 tool[tool_id], stime_buf);
2289 }
2290 return snprintf(buffer,length,"; process: unknown(%" PRIu32 "), time: %sZ",
2291 tool_id, stime_buf);
2292 }
2293 #endif /* SKIPFIX_ENABLE_TOMBSTONE_TIMES */
2294
2295 /**
2296 * Read a YAF Options Record containing the tombstone counter and
2297 * print a log message.
2298 */
2299 static gboolean
ski_tombstone_next(fBuf_t * fbuf,ski_record_t * record,const skpc_probe_t * probe,GError ** err)2300 ski_tombstone_next(
2301 fBuf_t *fbuf,
2302 ski_record_t *record,
2303 const skpc_probe_t *probe,
2304 GError **err)
2305 {
2306 #if SKIPFIX_ENABLE_TOMBSTONE_TIMES
2307 const ski_tombstone_access_t *ts_access;
2308 void *stl;
2309 #endif
2310 const ski_tombstone_t *ts;
2311 char buf[1024];
2312 char *b;
2313 size_t len;
2314 ssize_t sz;
2315
2316 makeTracemsgPrefix(buf, sizeof(buf), skpcProbeGetName(probe),
2317 fbSessionGetDomain(fBufGetSession(fbuf)),
2318 record->tid, record->tmpl);
2319 TRACEMSG(2, (("%s bmap " BMAP_PRI ", read by ski_tombstone_next()"),
2320 buf, record->bmap));
2321 assert(SKI_RECTYPE_TOMBSTONE == record->rectype);
2322
2323 /* Set internal template to read the options record */
2324 if (!fBufSetInternalTemplate(fbuf, SKI_TOMBSTONE_TID, err)) {
2325 return FALSE;
2326 }
2327 #if SKIPFIX_ENABLE_TOMBSTONE_TIMES
2328 fbSubTemplateListCollectorInit(&record->data.tombstone.stl);
2329 #if FIXBUF_CHECK_VERSION(2,3,0)
2330 fbSubTemplateListCollectorInit(&record->data.tombstone.tombstoneAccessList);
2331 #endif
2332 #endif /* SKIPFIX_ENABLE_TOMBSTONE_TIMES */
2333
2334 len = sizeof(record->data.tombstone);
2335 if (!fBufNext(fbuf, (uint8_t *)&record->data.tombstone, &len, err)) {
2336 return FALSE;
2337 }
2338 assert(len == sizeof(ski_tombstone_t));
2339 ts = &record->data.tombstone;
2340
2341 len = sizeof(buf);
2342 b = buf;
2343
2344 if (ts->stl.numElements) {
2345 /* This tombstone record pre-dates YAF 2.11 */
2346 assert(0 == ts->exportingProcessId);
2347 sz = snprintf(b, len, ("'%s': Received tombstone record:"
2348 " exporterId: %u:%u, tombstoneId: %u"),
2349 skpcProbeGetName(probe), ts->exporterConfiguredId,
2350 ts->exporterUniqueId, ts->tombstoneId);
2351 if (len < (size_t)sz) {
2352 goto WRITEMSG;
2353 }
2354 b += sz;
2355 len -= sz;
2356
2357 #if SKIPFIX_ENABLE_TOMBSTONE_TIMES
2358 stl = NULL;
2359 while ((stl = fbSubTemplateListGetNextPtr(&ts->stl, stl))) {
2360 ts_access = (ski_tombstone_access_t *)stl;
2361 sz = ski_tombstone_add_access(b, len,
2362 ts_access->observationTimeSeconds,
2363 ts_access->exportingProcessId);
2364 if (len < (size_t)sz) {
2365 goto WRITEMSG;
2366 }
2367 b += sz;
2368 len -= sz;
2369 }
2370 #endif /* SKIPFIX_ENABLE_TOMBSTONE_TIMES */
2371 } else {
2372 /* This tombstone record is from YAF 2.11 or later */
2373 assert(0 == ts->exporterUniqueId);
2374 sz = snprintf(b, len, ("'%s': Received Tombstone record:"
2375 " observationDomain:%u,"
2376 " exporterId:%u:%u, tombstoneId: %u"),
2377 skpcProbeGetName(probe), ts->observationDomainId,
2378 ts->exporterConfiguredId, ts->exportingProcessId,
2379 ts->tombstoneId);
2380 if (len < (size_t)sz) {
2381 goto WRITEMSG;
2382 }
2383 b += sz;
2384 len -= sz;
2385
2386 #if SKIPFIX_ENABLE_TOMBSTONE_TIMES && FIXBUF_CHECK_VERSION(2,3,0)
2387 stl = NULL;
2388 while ((stl = fbSubTemplateListGetNextPtr(&ts->tombstoneAccessList,
2389 stl)))
2390 {
2391 ts_access = (ski_tombstone_access_t *)stl;
2392 sz = ski_tombstone_add_access(b, len,
2393 ts_access->observationTimeSeconds,
2394 ts_access->certToolId);
2395 if (len < (size_t)sz) {
2396 goto WRITEMSG;
2397 }
2398 b += sz;
2399 len -= sz;
2400 }
2401 #endif /* SKIPFIX_ENABLE_TOMBSTONE_TIMES */
2402 }
2403
2404 WRITEMSG:
2405 buf[sizeof(buf)-1] = '\0';
2406 DEBUGMSG("%s", buf);
2407
2408 #if SKIPFIX_ENABLE_TOMBSTONE_TIMES
2409 fbSubTemplateListClear((fbSubTemplateList_t *)&ts->stl);
2410 #if FIXBUF_CHECK_VERSION(2,3,0)
2411 fbSubTemplateListClear((fbSubTemplateList_t *)&ts->tombstoneAccessList);
2412 #endif
2413 #endif /* SKIPFIX_ENABLE_TOMBSTONE_TIMES */
2414
2415 return TRUE;
2416 }
2417
2418
2419 /* Determine which names are used for certain elements in the
2420 * information model. */
2421 void
ski_nf9sampling_check_spec(void)2422 ski_nf9sampling_check_spec(
2423 void)
2424 {
2425 fbInfoModel_t *model;
2426 const fbInfoElementSpec_t *spec;
2427 uint32_t flags;
2428
2429 model = skiInfoModel();
2430 flags = 0;
2431
2432 for (spec = ski_nf9sampling_spec; spec->name; ++spec) {
2433 if (0 == spec->flags) {
2434 assert(fbInfoModelGetElementByName(model, spec->name));
2435 }
2436 else if (fbInfoModelGetElementByName(model, spec->name)) {
2437 if (0 == flags) {
2438 flags = spec->flags;
2439 } else if (spec->flags != flags) {
2440 skAppPrintErr("Info Element '%s' is in model; flags = %u",
2441 spec->name, flags);
2442 skAbort();
2443 }
2444 } else if (flags && spec->flags == flags) {
2445 skAppPrintErr("Info Element '%s' not in model; flags = %u",
2446 spec->name, flags);
2447 skAbort();
2448 }
2449 }
2450
2451 sampler_flags = flags;
2452
2453 skiInfoModelFree();
2454 }
2455
2456
2457 /**
2458 * Read a NetFlowV9 Options Record regarding the flow sampling rate
2459 * and write a message to the log file.
2460 */
2461 static gboolean
ski_nf9sampling_next(fBuf_t * fbuf,ski_record_t * record,const skpc_probe_t * probe,GError ** err)2462 ski_nf9sampling_next(
2463 fBuf_t *fbuf,
2464 ski_record_t *record,
2465 const skpc_probe_t *probe,
2466 GError **err)
2467 {
2468 size_t len;
2469
2470 DEFINE_PREFIX_BUF(prefix);
2471 makeTracemsgPrefix(prefix, sizeof(prefix), skpcProbeGetName(probe),
2472 fbSessionGetDomain(fBufGetSession(fbuf)),
2473 record->tid, record->tmpl);
2474 TRACEMSG(2, (("%s bmap " BMAP_PRI ", read by ski_nf9sampling_next()"),
2475 prefix, record->bmap));
2476 assert(SKI_RECTYPE_NF9SAMPLING == record->rectype);
2477
2478 /* Set internal template to read the options record */
2479 if (!fBufSetInternalTemplate(fbuf, SKI_NF9SAMPLING_TID, err)) {
2480 return FALSE;
2481 }
2482
2483 len = sizeof(record->data.nf9sampling);
2484 if (!fBufNext(fbuf, (uint8_t *)&record->data.nf9sampling, &len, err)) {
2485 return FALSE;
2486 }
2487 assert(len == sizeof(ski_nf9sampling_t));
2488
2489 if (skpcProbeGetLogFlags(probe) & SOURCE_LOG_SAMPLING) {
2490 if (record->bmap & TMPL_BIT_samplingAlgorithm) {
2491 INFOMSG("'%s': Sampling Algorithm %u; Sampling Interval %u",
2492 skpcProbeGetName(probe),
2493 record->data.nf9sampling.samplingAlgorithm,
2494 record->data.nf9sampling.samplingInterval);
2495 } else if (record->bmap & TMPL_BIT_samplerMode) {
2496 INFOMSG(("'%s': Flow Sampler Id %u; Flow Sampler Mode %u;"
2497 " Flow Sampler Random Interval %u"),
2498 skpcProbeGetName(probe),
2499 record->data.nf9sampling.samplerId,
2500 record->data.nf9sampling.samplerMode,
2501 record->data.nf9sampling.samplerRandomInterval);
2502 }
2503 }
2504 return TRUE;
2505 }
2506
2507
2508 /**
2509 * Call fBufNext() and transcode the data into the ski_ignore_spec
2510 * template. Return 1 on success or 0 on failure.
2511 */
2512 static int
ski_ignore_next(fBuf_t * fbuf,ski_record_t * record,const skpc_probe_t * probe,GError ** err)2513 ski_ignore_next(
2514 fBuf_t *fbuf,
2515 ski_record_t *record,
2516 const skpc_probe_t *probe,
2517 GError **err)
2518 {
2519 size_t len;
2520 DEFINE_PREFIX_BUF(prefix);
2521
2522 SK_UNUSED_PARAM(probe);
2523 makeTracemsgPrefix(prefix, sizeof(prefix), skpcProbeGetName(probe),
2524 fbSessionGetDomain(fBufGetSession(fbuf)),
2525 record->tid, record->tmpl);
2526 TRACEMSG(2, (("%s bmap " BMAP_PRI ", read by ski_ignore_next()"),
2527 prefix, record->bmap));
2528
2529 if (!fBufSetInternalTemplate(fbuf, SKI_IGNORE_TID, err)) {
2530 return FALSE;
2531 }
2532
2533 len = sizeof(record->data.ignore);
2534 return fBufNext(fbuf, (uint8_t *)&record->data.ignore, &len, err);
2535 }
2536
2537
2538 /*
2539 * Convert the NTP timestamp (RFC1305) contained in 'ntp' to epoch
2540 * milliseconds. The 'is_micro' field should be 0 if the function
2541 * is decoding dateTimeNanoseconds and non-zero when decoding
2542 * dateTimeMicroseconds.
2543 *
2544 * An NTP timestamp is a 64 bit value that has whole seconds in the
2545 * upper 32 bits and fractional seconds in the lower 32 bits. Each
2546 * fractional second represents 1/(2^32)th of a second.
2547 *
2548 * In addition, NTP uses an epoch time of Jan 1, 1900.
2549 *
2550 * When the 'is_micro' flag is set, decoding must ignore the 11
2551 * lowest bits of the fractional part of the timestamp.
2552 *
2553 * If 'ntp' is 0, assume the element was not in the model and
2554 * return 0.
2555 */
2556 static uint64_t
skiNTPDecode(uint64_t ntp,int is_micro)2557 skiNTPDecode(
2558 uint64_t ntp,
2559 int is_micro)
2560 {
2561 /* the UNIX epoch as a number of seconds since NTP epoch */
2562 #define JAN_1970 UINT64_C(0x83AA7E80)
2563
2564 double frac;
2565 uint64_t t;
2566
2567 if (!ntp) {
2568 return 0;
2569 }
2570 /* handle fractional seconds; convert to milliseconds */
2571 frac = (1000.0 * (ntp & (is_micro ? UINT32_C(0xFFFFF800) : UINT32_MAX))
2572 / (double)UINT64_C(0x100000000));
2573
2574 /* handle whole seconds, convert to milliseconds */
2575 t = ((ntp >> 32) - JAN_1970) * 1000;
2576
2577 return t + (uint64_t)frac;
2578 }
2579
2580
2581 /*
2582 * Run the Gauntlet of Time.
2583 *
2584 * Set the start-time and duration of the 'fwd_rec' member of
2585 * 'record' by checking the 'fixrec' member for the dozen or so
2586 * time fields that IPFIX provides.
2587 */
2588 static void
skiGauntletOfTime(fBuf_t * fbuf,ski_record_t * record,const skpc_probe_t * probe)2589 skiGauntletOfTime(
2590 fBuf_t *fbuf,
2591 ski_record_t *record,
2592 const skpc_probe_t *probe)
2593 {
2594 struct log_rec_time_st {
2595 /* "raw" start time from the record */
2596 uint64_t start_val;
2597 /* name of the IE in the 'start_val' member, NULL if none */
2598 const char *start_name;
2599 /* "raw" end time from the record */
2600 uint64_t end_val;
2601 /* name of the IE in the 'end_val' member, NULL if none */
2602 const char *end_name;
2603 /* "raw" duration time from the record */
2604 uint64_t dur_val;
2605 /* name of the IE in the 'dur_val' member, NULL if none */
2606 const char *dur_name;
2607 } log_rec_time = {
2608 0, NULL, 0, NULL, 0, NULL
2609 };
2610 char stime_buf[SKTIMESTAMP_STRLEN];
2611 uint64_t sTime, eTime;
2612 uint32_t duration;
2613 sktime_t export_time;
2614 intmax_t uptime, difference;
2615 sktime_t export_msec;
2616 const char *rollover_first;
2617 const char *rollover_last = "";
2618 const ski_fixrec_t *fixrec;
2619 rwRec *rec;
2620
2621 /* which of sTime(1), duration(2), and eTime(4) were found, and
2622 * whether export_time(8) was used for sTime or eTime */
2623 unsigned int time_fields;
2624
2625 #define COMPUTE_DURATION(s_time, e_time) \
2626 (((e_time < s_time) || ((e_time - s_time) > UINT32_MAX)) \
2627 ? UINT32_MAX \
2628 : (e_time - s_time))
2629
2630 #define RECORD_SET_TIME_RETURN_NO_LOG(s_time, elapsed) \
2631 rwRecSetStartTime(rec, (sktime_t)s_time); \
2632 rwRecSetElapsed(rec, elapsed); \
2633 if (!(skpcProbeGetLogFlags(probe) & SOURCE_LOG_TIMESTAMPS)) { \
2634 return; \
2635 }
2636
2637 fixrec = &record->data.fixrec;
2638 rec = record->fwd_rec;
2639
2640 TRACEMSG(3, ("bmap & time_mask: " BMAP_PRI " => " BMAP_PRI,
2641 record->bmap,
2642 (BMAP_TYPE)(record->bmap & TMPL_MASK_GAUNTLET_OF_TIME)));
2643
2644 switch (record->bmap & TMPL_MASK_GAUNTLET_OF_TIME) {
2645 case (TMPL_BIT_flowStartMilliseconds | TMPL_BIT_flowEndMilliseconds):
2646 sTime = fixrec->flowStartMilliseconds;
2647 eTime = fixrec->flowEndMilliseconds;
2648 RECORD_SET_TIME_RETURN_NO_LOG(sTime, COMPUTE_DURATION(sTime, eTime));
2649 time_fields = 5;
2650 log_rec_time.start_val = fixrec->flowStartMilliseconds;
2651 log_rec_time.start_name = "flowStartMilliseconds";
2652 log_rec_time.end_val = fixrec->flowEndMilliseconds;
2653 log_rec_time.end_name = "flowEndMilliseconds";
2654 break;
2655
2656 case (TMPL_BIT_flowStartSeconds | TMPL_BIT_flowEndSeconds):
2657 sTime = sktimeCreate(fixrec->flowStartSeconds, 0);
2658 eTime = sktimeCreate(fixrec->flowEndSeconds, 0);
2659 RECORD_SET_TIME_RETURN_NO_LOG(sTime, COMPUTE_DURATION(sTime, eTime));
2660 time_fields = 5;
2661 log_rec_time.start_val = fixrec->flowStartSeconds;
2662 log_rec_time.start_name = "flowStartSeconds";
2663 log_rec_time.end_val = fixrec->flowEndSeconds;
2664 log_rec_time.end_name = "flowEndSeconds";
2665 break;
2666
2667 case (TMPL_BIT_flowStartMicroseconds | TMPL_BIT_flowEndMicroseconds):
2668 sTime = skiNTPDecode(fixrec->flowStartMicroseconds, 1);
2669 eTime = skiNTPDecode(fixrec->flowEndMicroseconds, 1);
2670 RECORD_SET_TIME_RETURN_NO_LOG(sTime, COMPUTE_DURATION(sTime, eTime));
2671 time_fields = 5;
2672 log_rec_time.start_val = fixrec->flowStartMicroseconds;
2673 log_rec_time.start_name = "flowStartMicroseconds";
2674 log_rec_time.end_val = fixrec->flowEndMicroseconds;
2675 log_rec_time.end_name = "flowEndMicroseconds";
2676 break;
2677
2678 case (TMPL_BIT_flowStartNanoseconds | TMPL_BIT_flowEndNanoseconds):
2679 sTime = skiNTPDecode(fixrec->flowStartNanoseconds, 0);
2680 eTime = skiNTPDecode(fixrec->flowEndNanoseconds, 0);
2681 RECORD_SET_TIME_RETURN_NO_LOG(sTime, COMPUTE_DURATION(sTime, eTime));
2682 time_fields = 5;
2683 log_rec_time.start_val = fixrec->flowStartNanoseconds;
2684 log_rec_time.start_name = "flowStartNanoseconds";
2685 log_rec_time.end_val = fixrec->flowEndNanoseconds;
2686 log_rec_time.end_name = "flowEndNanoseconds";
2687 break;
2688
2689 case (TMPL_BIT_flowStartDeltaMicroseconds
2690 | TMPL_BIT_flowEndDeltaMicroseconds):
2691 export_time = sktimeCreate(fBufGetExportTime(fbuf), 0);
2692 sTime = (export_time - fixrec->flowStartDeltaMicroseconds / 1000);
2693 eTime = (export_time - fixrec->flowEndDeltaMicroseconds / 1000);
2694 RECORD_SET_TIME_RETURN_NO_LOG(sTime, COMPUTE_DURATION(sTime, eTime));
2695 time_fields = 13;
2696 log_rec_time.start_val = fixrec->flowStartDeltaMicroseconds;
2697 log_rec_time.start_name = "flowStartDeltaMicroseconds";
2698 log_rec_time.end_val = fixrec->flowEndDeltaMicroseconds;
2699 log_rec_time.end_name = "flowEndDeltaMicroseconds";
2700 break;
2701
2702 case (TMPL_BIT_flowStartMilliseconds| TMPL_BIT_flowDurationMilliseconds):
2703 sTime = fixrec->flowStartMilliseconds;
2704 duration = fixrec->flowDurationMilliseconds;
2705 RECORD_SET_TIME_RETURN_NO_LOG(sTime, duration);
2706 time_fields = 3;
2707 log_rec_time.start_val = fixrec->flowStartMilliseconds;
2708 log_rec_time.start_name = "flowStartMilliseconds";
2709 log_rec_time.dur_val = fixrec->flowDurationMilliseconds;
2710 log_rec_time.dur_name = "flowDurationMilliseconds";
2711 break;
2712
2713 case (TMPL_BIT_flowStartMicroseconds| TMPL_BIT_flowDurationMicroseconds):
2714 sTime = skiNTPDecode(fixrec->flowStartMicroseconds, 1);
2715 duration = fixrec->flowDurationMicroseconds / 1000;
2716 RECORD_SET_TIME_RETURN_NO_LOG(sTime, duration);
2717 time_fields = 3;
2718 log_rec_time.start_val = fixrec->flowStartMicroseconds;
2719 log_rec_time.start_name = "flowStartMicroseconds";
2720 log_rec_time.dur_val = fixrec->flowDurationMicroseconds;
2721 log_rec_time.dur_name = "flowDurationMicroseconds";
2722 break;
2723
2724 case 0:
2725 /* no times, set start to export time and set dur to 0 */
2726 sTime = sktimeCreate(fBufGetExportTime(fbuf), 0);
2727 RECORD_SET_TIME_RETURN_NO_LOG(sTime, 0);
2728 time_fields = 0;
2729 break;
2730
2731 case (TMPL_BIT_flowStartSysUpTime | TMPL_BIT_systemInitTimeMilliseconds):
2732 /* Times based on flow generator system uptimes (Netflow v9) */
2733
2734 /* Compute the uptime: systemInitTimeMilliseconds is the
2735 * absolute router boot time (msec), and libfixbuf sets it by
2736 * subtracting the NFv9 uptime (msec) from the record's
2737 * absolute export time (sec). */
2738 export_msec = sktimeCreate(fBufGetExportTime(fbuf), 0);
2739 uptime = export_msec - fixrec->systemInitTimeMilliseconds;
2740 if (skpcProbeGetQuirks(probe) & SKPC_QUIRK_NF9_SYSUPTIME_SECS) {
2741 /* uptime was reported in seconds, not msec */
2742 TRACEMSG(3, (("Before adjustment: exportTimeMillisec %" PRIu64
2743 ", initTimeMillisec %" PRIu64 ", upTime %" PRIdMAX
2744 ", startUpTime %" PRIu32 ", endUpTime %" PRIu32
2745 ", packets %" PRIu32),
2746 export_msec, fixrec->systemInitTimeMilliseconds,
2747 uptime, fixrec->flowStartSysUpTime,
2748 fixrec->flowEndSysUpTime, rwRecGetPkts(rec)));
2749 uptime *= 1000;
2750 record->data.fixrec.systemInitTimeMilliseconds
2751 = export_msec - uptime;
2752 if (rwRecGetPkts(rec) == 1
2753 && (fixrec->flowEndSysUpTime < fixrec->flowStartSysUpTime))
2754 {
2755 /* sometimes the end time for single packet flows is
2756 * very different than the start time. */
2757 record->data.fixrec.flowEndSysUpTime
2758 = record->data.fixrec.flowStartSysUpTime;
2759 }
2760 }
2761
2762 /* Set the duration. */
2763 if (fixrec->flowStartSysUpTime <= fixrec->flowEndSysUpTime) {
2764 rwRecSetElapsed(rec, (fixrec->flowEndSysUpTime
2765 - fixrec->flowStartSysUpTime));
2766 } else {
2767 /* assume EndTime rolled-over and start did not */
2768 rwRecSetElapsed(rec, (ROLLOVER32 + fixrec->flowEndSysUpTime
2769 - fixrec->flowStartSysUpTime));
2770 rollover_last = ", assume flowEndSysUpTime rollover";
2771 }
2772 /* Set start time. */
2773 difference = uptime - fixrec->flowStartSysUpTime;
2774 if (difference > MAXIMUM_FLOW_TIME_DEVIATION) {
2775 /* assume upTime is set before record is composed and
2776 * that start-time has rolled over. */
2777 rwRecSetStartTime(rec, (fixrec->systemInitTimeMilliseconds
2778 + fixrec->flowStartSysUpTime
2779 + ROLLOVER32));
2780 rollover_first = ", assume flowStartSysUpTime rollover";
2781 } else if (-difference > MAXIMUM_FLOW_TIME_DEVIATION) {
2782 /* assume upTime is set after record is composed and
2783 * that upTime has rolled over. */
2784 rwRecSetStartTime(rec, (fixrec->systemInitTimeMilliseconds
2785 + fixrec->flowStartSysUpTime
2786 - ROLLOVER32));
2787 rollover_first = ", assume sysUpTime rollover";
2788 } else {
2789 /* times look reasonable; assume no roll over */
2790 rwRecSetStartTime(rec, (fixrec->systemInitTimeMilliseconds
2791 + fixrec->flowStartSysUpTime));
2792 rollover_first = "";
2793 }
2794 if (skpcProbeGetLogFlags(probe) & SOURCE_LOG_TIMESTAMPS) {
2795 sktimestamp_r(stime_buf, rwRecGetStartTime(rec), SKTIMESTAMP_UTC);
2796 INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
2797 " flowStartSysUpTime=%" PRIu32
2798 ", flowEndSysUpTime=%" PRIu32
2799 ", systemInitTimeMilliseconds=%" PRIu64
2800 ", exportTimeSeconds=%" PRIu32
2801 ", calculated sysUpTime=%" PRIdMAX "%s%s"),
2802 skpcProbeGetName(probe),
2803 stime_buf, (double)rwRecGetElapsed(rec)/1000,
2804 fixrec->flowStartSysUpTime, fixrec->flowEndSysUpTime,
2805 fixrec->systemInitTimeMilliseconds,
2806 fBufGetExportTime(fbuf), uptime,
2807 rollover_first, rollover_last);
2808 }
2809 return;
2810
2811 case (TMPL_BIT_flowStartSysUpTime):
2812 /* Times based on flow generator system uptimes (Netflow v9),
2813 * but there is no system init time and we do not know when
2814 * the router booted. assume end-time is same as the record's
2815 * export time and set start-time accordingly. */
2816 if (fixrec->flowStartSysUpTime <= fixrec->flowEndSysUpTime) {
2817 rwRecSetElapsed(rec, (fixrec->flowEndSysUpTime
2818 - fixrec->flowStartSysUpTime));
2819 } else {
2820 /* assume EndTime rolled-over and start did not */
2821 rwRecSetElapsed(rec, (ROLLOVER32 + fixrec->flowEndSysUpTime
2822 - fixrec->flowStartSysUpTime));
2823 rollover_last = ", assume flowEndSysUpTime rollover";
2824 }
2825 /* Set start time. */
2826 export_msec = sktimeCreate(fBufGetExportTime(fbuf), 0);
2827 rwRecSetStartTime(rec, export_msec - rwRecGetElapsed(rec));
2828 if (skpcProbeGetLogFlags(probe) & SOURCE_LOG_TIMESTAMPS) {
2829 sktimestamp_r(stime_buf, rwRecGetStartTime(rec), SKTIMESTAMP_UTC);
2830 INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
2831 " flowStartSysUpTime=%" PRIu32
2832 ", flowEndSysUpTime=%" PRIu32
2833 ", no systemInitTimeMilliseconds"
2834 ", set end to exportTimeSeconds=%" PRIu32 "%s"),
2835 skpcProbeGetName(probe),
2836 stime_buf, (double)rwRecGetElapsed(rec)/1000,
2837 fixrec->flowStartSysUpTime, fixrec->flowEndSysUpTime,
2838 fBufGetExportTime(fbuf), rollover_last);
2839 }
2840 return;
2841
2842 default:
2843 time_fields = 0;
2844 /* look for all possible start times */
2845 if (record->bmap & TMPL_BIT_flowStartMilliseconds) {
2846 time_fields |= 1;
2847 sTime = fixrec->flowStartMilliseconds;
2848 log_rec_time.start_val = fixrec->flowStartMilliseconds;
2849 log_rec_time.start_name = "flowStartMilliseconds";
2850 } else if (record->bmap & TMPL_BIT_flowStartSeconds) {
2851 time_fields |= 1;
2852 sTime = UINT64_C(1000) * (uint64_t)fixrec->flowStartSeconds;
2853 log_rec_time.start_val = fixrec->flowStartSeconds;
2854 log_rec_time.start_name = "flowStartSeconds";
2855 } else if (record->bmap & TMPL_BIT_flowStartMicroseconds) {
2856 time_fields |= 1;
2857 sTime = skiNTPDecode(fixrec->flowStartMicroseconds, 1);
2858 log_rec_time.start_val = fixrec->flowStartMicroseconds;
2859 log_rec_time.start_name = "flowStartMicroseconds";
2860 } else if (record->bmap & TMPL_BIT_flowStartNanoseconds) {
2861 time_fields |= 1;
2862 sTime = skiNTPDecode(fixrec->flowStartNanoseconds, 0);
2863 log_rec_time.start_val = fixrec->flowStartNanoseconds;
2864 log_rec_time.start_name = "flowStartNanoseconds";
2865 } else if (record->bmap & TMPL_BIT_flowStartDeltaMicroseconds) {
2866 time_fields |= 9;
2867 sTime = (fBufGetExportTime(fbuf) * 1000
2868 - fixrec->flowStartDeltaMicroseconds / 1000);
2869 log_rec_time.start_val = fixrec->flowStartDeltaMicroseconds;
2870 log_rec_time.start_name = "flowStartDeltaMicroseconds";
2871 } else if (record->bmap & TMPL_BIT_flowStartSysUpTime) {
2872 /* must be additional time fields present that prevented
2873 * the switch from matching; mask those out and call this
2874 * function again */
2875 BMAP_TYPE bmap_orig = record->bmap;
2876 record->bmap = bmap_orig & (TMPL_BIT_flowStartSysUpTime
2877 | TMPL_BIT_systemInitTimeMilliseconds);
2878 TRACEMSG(1, ("Unusual time fields present on record"));
2879 skiGauntletOfTime(fbuf, record, probe);
2880 record->bmap = bmap_orig;
2881 return;
2882 } else {
2883 sTime = 0;
2884 }
2885
2886 /* look for all possible end times; if none found look for
2887 * collection/observation times */
2888 if (record->bmap & TMPL_BIT_flowEndMilliseconds) {
2889 time_fields |= 4;
2890 eTime = fixrec->flowEndMilliseconds;
2891 log_rec_time.end_val = fixrec->flowEndMilliseconds;
2892 log_rec_time.end_name = "flowEndMilliseconds";
2893 } else if (record->bmap & TMPL_BIT_flowEndSeconds) {
2894 time_fields |= 4;
2895 eTime = UINT64_C(1000) * (uint64_t)fixrec->flowEndSeconds;
2896 log_rec_time.end_val = fixrec->flowEndSeconds;
2897 log_rec_time.end_name = "flowEndSeconds";
2898 } else if (record->bmap & TMPL_BIT_flowEndMicroseconds) {
2899 time_fields |= 4;
2900 eTime = skiNTPDecode(fixrec->flowEndMicroseconds, 1);
2901 log_rec_time.end_val = fixrec->flowEndMicroseconds;
2902 log_rec_time.end_name = "flowEndMicroseconds";
2903 } else if (record->bmap & TMPL_BIT_flowEndNanoseconds) {
2904 time_fields |= 4;
2905 eTime = skiNTPDecode(fixrec->flowEndNanoseconds, 0);
2906 log_rec_time.end_val = fixrec->flowEndNanoseconds;
2907 log_rec_time.end_name = "flowEndNanoseconds";
2908 } else if (record->bmap & TMPL_BIT_flowEndDeltaMicroseconds) {
2909 time_fields |= 12;
2910 eTime = (fBufGetExportTime(fbuf) * 1000
2911 - fixrec->flowEndDeltaMicroseconds / 1000);
2912 log_rec_time.end_val = fixrec->flowEndDeltaMicroseconds;
2913 log_rec_time.end_name = "flowEndDeltaMicroseconds";
2914 } else if (record->bmap & TMPL_BIT_collectionTimeMilliseconds) {
2915 time_fields |= 4;
2916 eTime = fixrec->collectionTimeMilliseconds;
2917 log_rec_time.end_val = fixrec->collectionTimeMilliseconds;
2918 log_rec_time.end_name = "collectionTimeMilliseconds";
2919 } else if (record->bmap & TMPL_BIT_observationTimeMilliseconds) {
2920 time_fields |= 4;
2921 eTime = fixrec->observationTimeMilliseconds;
2922 log_rec_time.end_val = fixrec->observationTimeMilliseconds;
2923 log_rec_time.end_name = "observationTimeMilliseconds";
2924 } else if (record->bmap & TMPL_BIT_observationTimeSeconds) {
2925 time_fields |= 4;
2926 eTime = UINT64_C(1000) * (uint64_t)fixrec->observationTimeSeconds;
2927 log_rec_time.end_val = fixrec->observationTimeSeconds;
2928 log_rec_time.end_name = "observationTimeSeconds";
2929 } else if (record->bmap & TMPL_BIT_observationTimeMicroseconds) {
2930 time_fields |= 4;
2931 eTime = skiNTPDecode(fixrec->observationTimeMicroseconds, 1);
2932 log_rec_time.end_val = fixrec->observationTimeMicroseconds;
2933 log_rec_time.end_name = "observationTimeMicroseconds";
2934 } else if (record->bmap & TMPL_BIT_observationTimeNanoseconds) {
2935 time_fields |= 4;
2936 eTime = skiNTPDecode(fixrec->observationTimeNanoseconds, 0);
2937 log_rec_time.end_val = fixrec->observationTimeNanoseconds;
2938 log_rec_time.end_name = "observationTimeNanoseconds";
2939 } else {
2940 eTime = 0;
2941 }
2942
2943 /* look for durations */
2944 if (record->bmap & TMPL_BIT_flowDurationMilliseconds) {
2945 time_fields |= 2;
2946 duration = fixrec->flowDurationMilliseconds;
2947 log_rec_time.dur_val = fixrec->flowDurationMilliseconds;
2948 log_rec_time.dur_name = "flowDurationMilliseconds";
2949 } else if (record->bmap & TMPL_BIT_flowDurationMicroseconds) {
2950 time_fields |= 2;
2951 duration = fixrec->flowDurationMicroseconds / 1000;
2952 log_rec_time.dur_val = fixrec->flowDurationMicroseconds;
2953 log_rec_time.dur_name = "flowDurationMicroseconds";
2954 } else {
2955 duration = 0;
2956 }
2957
2958 /* set the time fields on the record */
2959 switch (time_fields) {
2960 case 3: case 7: case 11: case 15:
2961 /* have start and duration; use them; ignore eTime if
2962 * present */
2963 RECORD_SET_TIME_RETURN_NO_LOG(sTime, duration);
2964 break;
2965
2966 case 5: case 13:
2967 /* have start and end; use them */
2968 RECORD_SET_TIME_RETURN_NO_LOG(
2969 sTime, COMPUTE_DURATION(sTime, eTime));
2970 break;
2971
2972 case 1: case 9:
2973 /* only have a start time; use it and set dur to 0 */
2974 RECORD_SET_TIME_RETURN_NO_LOG(sTime, 0);
2975 break;
2976
2977 case 6: case 14:
2978 /* have dur and end time; compute start */
2979 sTime = (sktime_t)(eTime - duration);
2980 RECORD_SET_TIME_RETURN_NO_LOG(sTime, duration);
2981 break;
2982
2983 case 2:
2984 /* only have a duration; use export time as end time */
2985 sTime = sktimeCreate(fBufGetExportTime(fbuf),0) - duration;
2986 RECORD_SET_TIME_RETURN_NO_LOG(sTime, duration);
2987 break;
2988
2989 case 4: case 12:
2990 /* only have an end time; use it as start time and set dur
2991 * to 0 */
2992 RECORD_SET_TIME_RETURN_NO_LOG(eTime, 0);
2993 sTime = eTime;
2994 break;
2995
2996 case 0:
2997 /* no times, set start to export time and set dur to 0 */
2998 sTime = sktimeCreate(fBufGetExportTime(fbuf), 0);
2999 RECORD_SET_TIME_RETURN_NO_LOG(sTime, 0);
3000 break;
3001
3002 case 8: case 10:
3003 default:
3004 skAbortBadCase(time_fields);
3005 }
3006 /* close the default: clause */
3007 break;
3008 }
3009
3010 /* Should only get here when logging was requested */
3011 assert(skpcProbeGetLogFlags(probe) & SOURCE_LOG_TIMESTAMPS);
3012 sktimestamp_r(stime_buf, sTime, SKTIMESTAMP_UTC);
3013 switch (time_fields) {
3014 case 3:
3015 case 7:
3016 /* stime, duration, and maybe etime (which is ignored) */
3017 INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
3018 " %s=%" PRIu64 ", %s=%" PRIu64),
3019 skpcProbeGetName(probe), stime_buf,
3020 (double)rwRecGetElapsed(rec)/1000,
3021 log_rec_time.start_name,log_rec_time.start_val,
3022 log_rec_time.dur_name, log_rec_time.dur_val);
3023 break;
3024
3025 case 11:
3026 case 15:
3027 /* as above, with export time */
3028 INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
3029 " %s=%" PRIu64 ", %s=%" PRIu64 ", exportTimeSeconds=%" PRIu32),
3030 skpcProbeGetName(probe), stime_buf,
3031 (double)rwRecGetElapsed(rec)/1000,
3032 log_rec_time.start_name,log_rec_time.start_val,
3033 log_rec_time.dur_name, log_rec_time.dur_val,
3034 fBufGetExportTime(fbuf));
3035 break;
3036
3037 case 5:
3038 /* stime and etime */
3039 INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
3040 " %s=%" PRIu64 ", %s=%" PRIu64),
3041 skpcProbeGetName(probe), stime_buf,
3042 (double)rwRecGetElapsed(rec)/1000,
3043 log_rec_time.start_name,log_rec_time.start_val,
3044 log_rec_time.end_name, log_rec_time.end_val);
3045 break;
3046
3047 case 13:
3048 /* stime and etime, with export time */
3049 INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
3050 " %s=%" PRIu64 ", %s=%" PRIu64 ", exportTimeSeconds=%" PRIu32),
3051 skpcProbeGetName(probe), stime_buf,
3052 (double)rwRecGetElapsed(rec)/1000,
3053 log_rec_time.start_name,log_rec_time.start_val,
3054 log_rec_time.end_name, log_rec_time.end_val,
3055 fBufGetExportTime(fbuf));
3056 break;
3057
3058 case 1:
3059 /* stime only */
3060 INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
3061 " %s=%" PRIu64),
3062 skpcProbeGetName(probe), stime_buf, 0.0,
3063 log_rec_time.start_name,log_rec_time.start_val);
3064 break;
3065
3066 case 9:
3067 /* stime only with an export time */
3068 INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
3069 " %s=%" PRIu64 ", exportTimeSeconds=%" PRIu32),
3070 skpcProbeGetName(probe), stime_buf, 0.0,
3071 log_rec_time.start_name,log_rec_time.start_val,
3072 fBufGetExportTime(fbuf));
3073 break;
3074
3075 case 6:
3076 /* duration and end time */
3077 INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
3078 " %s=%" PRIu64 ", %s=%" PRIu64),
3079 skpcProbeGetName(probe), stime_buf,
3080 (double)rwRecGetElapsed(rec)/1000,
3081 log_rec_time.dur_name, log_rec_time.dur_val,
3082 log_rec_time.end_name, log_rec_time.end_val);
3083 break;
3084
3085 case 14:
3086 /* duration and end time, with an export time */
3087 INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
3088 " %s=%" PRIu64 ", %s=%" PRIu64 ", exportTimeSeconds=%" PRIu32),
3089 skpcProbeGetName(probe), stime_buf,
3090 (double)rwRecGetElapsed(rec)/1000,
3091 log_rec_time.dur_name, log_rec_time.dur_val,
3092 log_rec_time.end_name, log_rec_time.end_val,
3093 fBufGetExportTime(fbuf));
3094 break;
3095
3096 case 2:
3097 /* duration only */
3098 INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
3099 " %s=%" PRIu64 ", set end to exportTimeSeconds=%" PRIu32),
3100 skpcProbeGetName(probe), stime_buf,
3101 (double)rwRecGetElapsed(rec)/1000,
3102 log_rec_time.dur_name, log_rec_time.dur_val,
3103 fBufGetExportTime(fbuf));
3104 break;
3105
3106 case 4:
3107 /* end time only */
3108 INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
3109 " %s=%" PRIu64),
3110 skpcProbeGetName(probe), stime_buf, 0.0,
3111 log_rec_time.end_name, log_rec_time.end_val);
3112 break;
3113
3114 case 12:
3115 /* end time only, with export time */
3116 INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
3117 " %s=%" PRIu64 ", exportTimeSeconds=%" PRIu32),
3118 skpcProbeGetName(probe), stime_buf, 0.0,
3119 log_rec_time.end_name, log_rec_time.end_val,
3120 fBufGetExportTime(fbuf));
3121 break;
3122
3123 case 0:
3124 /* no times */
3125 INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs based on"
3126 " exportTimeSeconds=%" PRIu32),
3127 skpcProbeGetName(probe), stime_buf, 0.0,
3128 fBufGetExportTime(fbuf));
3129 break;
3130
3131 case 8:
3132 case 10:
3133 default:
3134 skAbortBadCase(time_fields);
3135 }
3136 }
3137
3138
3139 /**
3140 * Print a log message saying why a ski_fixrec_t record was ignored
3141 */
3142 static void
ski_fixrec_ignore(const ski_fixrec_t * fixrec,const char * reason)3143 ski_fixrec_ignore(
3144 const ski_fixrec_t *fixrec,
3145 const char *reason)
3146 {
3147 skipaddr_t ipaddr;
3148 char sipbuf[64];
3149 char dipbuf[64];
3150
3151 if (!SK_IPV6_IS_ZERO(fixrec->sourceIPv6Address)) {
3152 #ifdef SK_HAVE_INET_NTOP
3153 if (!inet_ntop(AF_INET6, &fixrec->sourceIPv6Address,
3154 sipbuf, sizeof(sipbuf)))
3155 #endif
3156 {
3157 strcpy(sipbuf, "unknown-v6");
3158 }
3159 } else {
3160 skipaddrSetV4(&ipaddr, &fixrec->sourceIPv4Address);
3161 skipaddrString(sipbuf, &ipaddr, SKIPADDR_CANONICAL);
3162 }
3163 if (!SK_IPV6_IS_ZERO(fixrec->destinationIPv6Address)) {
3164 #ifdef SK_HAVE_INET_NTOP
3165 if (!inet_ntop(AF_INET6, &fixrec->destinationIPv6Address,
3166 dipbuf, sizeof(dipbuf)))
3167 #endif
3168 {
3169 strcpy(dipbuf, "unknown-v6");
3170 }
3171 } else {
3172 skipaddrSetV4(&ipaddr, &fixrec->destinationIPv4Address);
3173 skipaddrString(dipbuf, &ipaddr, SKIPADDR_CANONICAL);
3174 }
3175
3176 INFOMSG(("IGNORED|%s|%s|%u|%u|%u|%" PRIu64 "|%" PRIu64 "|%s|"),
3177 sipbuf, dipbuf, fixrec->sourceTransportPort,
3178 fixrec->destinationTransportPort,fixrec->protocolIdentifier,
3179 ((fixrec->packetDeltaCount)
3180 ? fixrec->packetDeltaCount
3181 : ((fixrec->packetTotalCount)
3182 ? fixrec->packetTotalCount
3183 : fixrec->initiatorPackets)),
3184 ((fixrec->octetDeltaCount)
3185 ? fixrec->octetDeltaCount
3186 : ((fixrec->octetTotalCount)
3187 ? fixrec->octetTotalCount
3188 : fixrec->initiatorOctets)),
3189 reason);
3190 }
3191
3192
3193 /**
3194 * Call fBufNext() and transcode the data into the ski_fixrec_spec
3195 * template, then convert the structure into 0, 1, or 2 SiLK Flow
3196 * records and fill the record pointers on the 'record' structure.
3197 * The return value indicates the number of records converted.
3198 * Return -1 on failure.
3199 *
3200 * The reverse record is cleared via RWREC_CLEAR() when the return
3201 * value is 1.
3202 *
3203 * Return 0 if the IPFIX record should be ignored. The forward rec
3204 * will have been cleared; the reverse record is untouched. A
3205 * record can be ignored when (1)the record is IPv6 and SiLK is
3206 * compiled without IPv6 support, (2)the record has a packet and/or
3207 * byte count of 0, or (3)the record is explicitly marked as an
3208 * "intermediate" record by yaf.
3209 *
3210 * Return -1 on failure. The forward rec will have been cleared;
3211 * the reverse record is untouched.
3212 */
3213 static int
ski_fixrec_next(fBuf_t * fbuf,ski_record_t * record,const skpc_probe_t * probe,GError ** err)3214 ski_fixrec_next(
3215 fBuf_t *fbuf,
3216 ski_record_t *record,
3217 const skpc_probe_t *probe,
3218 GError **err)
3219 {
3220 fbSubTemplateMultiListEntry_t *stml;
3221 ski_fixrec_t *fixrec;
3222 size_t len;
3223 uint64_t pkts, bytes;
3224 uint64_t rev_pkts, rev_bytes;
3225 uint8_t tcp_state;
3226 uint8_t tcp_flags;
3227 int have_tcp_stml = 0;
3228 rwRec *fwd_rec;
3229
3230 DEFINE_PREFIX_BUF(prefix);
3231 makeTracemsgPrefix(prefix, sizeof(prefix), skpcProbeGetName(probe),
3232 fbSessionGetDomain(fBufGetSession(fbuf)),
3233 record->tid, record->tmpl);
3234 TRACEMSG(2, (("%s bmap " BMAP_PRI ", read by ski_fixrec_next()"),
3235 prefix, record->bmap));
3236 assert(SKI_RECTYPE_FIXREC == record->rectype);
3237
3238 /* Get a local handle to the record and clear it */
3239 fwd_rec = record->fwd_rec;
3240 RWREC_CLEAR(fwd_rec);
3241
3242 /* Set internal template to read an extended flow record */
3243 if (!fBufSetInternalTemplate(fbuf, SKI_FIXREC_TID, err)) {
3244 return -1;
3245 }
3246
3247 /* Get the next record */
3248 len = sizeof(record->data.fixrec);
3249 if (!fBufNext(fbuf, (uint8_t *)&record->data.fixrec, &len, err)) {
3250 return -1;
3251 }
3252 assert(len == sizeof(ski_fixrec_t));
3253 fixrec = &record->data.fixrec;
3254
3255 if ((fixrec->flowEndReason & SKI_END_MASK)== SKI_END_YAF_INTERMEDIATE_FLOW)
3256 {
3257 TRACEMSG(2, ("Ignored YAF intermediate uniflow"));
3258 return 0;
3259 }
3260
3261 /* Ignore records with no IPs. Ignore records that do not have
3262 * IPv4 addresses when SiLK was built without IPv6 support. */
3263 if (record->bmap & TMPL_BIT_sourceIPv4Address) {
3264 /* we're good */
3265 } else if (record->bmap & TMPL_BIT_sourceIPv6Address) {
3266 #if !SK_ENABLE_IPV6
3267 ski_fixrec_ignore(fixrec, "IPv6 record");
3268 return 0;
3269 #endif /* SK_ENABLE_IPV6 */
3270 } else if ((skpcProbeGetQuirks(probe) & SKPC_QUIRK_MISSING_IPS) == 0) {
3271 ski_fixrec_ignore(fixrec, "No IP addresses");
3272 return 0;
3273 }
3274
3275 if (skpcProbeGetQuirks(probe) & SKPC_QUIRK_NF9_OUT_IS_REVERSE) {
3276 TRACEMSG(
3277 2, (("Setting reverse Octet/Packet counts (currently"
3278 " %" PRIu64 "/%" PRIu64 ") to post Octet/Packet counts"
3279 " (%" PRIu64 "/%" PRIu64 ") due to nf9-out-is-reverse"),
3280 fixrec->reverseOctetDeltaCount,fixrec->reversePacketDeltaCount,
3281 fixrec->postOctetDeltaCount, fixrec->postPacketDeltaCount));
3282 fixrec->reverseOctetDeltaCount = fixrec->postOctetDeltaCount;
3283 fixrec->reversePacketDeltaCount = fixrec->postPacketDeltaCount;
3284 fixrec->postOctetDeltaCount = fixrec->postPacketDeltaCount = 0;
3285 }
3286
3287 /* Get the forward and reverse packet and byte counts (run the
3288 * Gauntlet of Volume). */
3289 pkts = ((fixrec->packetDeltaCount)
3290 ? fixrec->packetDeltaCount
3291 : ((fixrec->packetTotalCount)
3292 ? fixrec->packetTotalCount
3293 : ((fixrec->initiatorPackets)
3294 ? fixrec->initiatorPackets
3295 : ((fixrec->postPacketDeltaCount)
3296 ? fixrec->postPacketDeltaCount
3297 : fixrec->postPacketTotalCount))));
3298 bytes = ((fixrec->octetDeltaCount)
3299 ? fixrec->octetDeltaCount
3300 : ((fixrec->octetTotalCount)
3301 ? fixrec->octetTotalCount
3302 : ((fixrec->initiatorOctets)
3303 ? fixrec->initiatorOctets
3304 : ((fixrec->postOctetDeltaCount)
3305 ? fixrec->postOctetDeltaCount
3306 : fixrec->postOctetTotalCount))));
3307
3308 /* I suppose we could add checks for
3309 * reversePost{Packet,Octet}{Delta,Total}Count here as well. */
3310 rev_pkts = ((fixrec->reversePacketDeltaCount)
3311 ? fixrec->reversePacketDeltaCount
3312 : ((fixrec->reversePacketTotalCount)
3313 ? fixrec->reversePacketTotalCount
3314 : fixrec->responderPackets));
3315 rev_bytes = ((fixrec->reverseOctetDeltaCount)
3316 ? fixrec->reverseOctetDeltaCount
3317 : ((fixrec->reverseOctetTotalCount)
3318 ? fixrec->reverseOctetTotalCount
3319 : fixrec->responderOctets));
3320
3321 /*
3322 * Handle records that represent a "firewall event" when the
3323 * SKPC_QUIRK_FW_EVENT quirks value is set on the probe. When
3324 * the quirk is not set, process the records normally.
3325 *
3326 * This code changed in SiLK 3.8.0. Prior to SiLK 3.8.0, all
3327 * firewall event status messages were dropped.
3328 *
3329 * It seems that every record from a Cisco ASA has
3330 * <strike>NF_F_FW_EVENT</strike> and NF_F_FW_EXT_EVENT
3331 * information elements, so ignoring flow records with these
3332 * elements means ignoring all flow records.
3333 *
3334 * It now (2015-June) seems that the NF_F_FW_EVENT information
3335 * element mentioned in the previous paragraph has been replaced
3336 * with firewallEvent (IE 233).
3337 *
3338 * firewallEvent is an official IPFIX information element, IE 233
3339 *
3340 * NF_F_FW_EVENT is Cisco IE 40005
3341 *
3342 * NF_F_FW_EXT_EVENT is Cisco IE 33002.
3343 *
3344 * Note that the Cisco IE numbers cannot be used in IPFIX because
3345 * IPFIX would treat them as "reverse" records.
3346 *
3347 * References (October 2013):
3348 * http://www.cisco.com/en/US/docs/security/asa/asa82/netflow/netflow.html#wp1028202
3349 * http://www.cisco.com/en/US/docs/security/asa/asa84/system/netflow/netflow.pdf
3350 *
3351 * Values for the NF_F_FW_EXT_EVENT depend on the values for the
3352 * firewallEvent or NF_F_FW_EVENT. The following lists the
3353 * FW_EVENT with sub-bullets for the NF_F_FW_EXT_EVENT.
3354 *
3355 * 0. Ignore -- This value indicates that a field must be
3356 * ignored.
3357 *
3358 * 0. Ignore -- This value indicates that the field must be
3359 * ignored.
3360 *
3361 * 1. Flow created -- This value indicates that a new flow was
3362 * created.
3363 *
3364 * 2. Flow deleted -- This value indicates that a flow was
3365 * deleted.
3366 *
3367 * >2000. Values above 2000 represent various reasons why a
3368 * flow was terminated.
3369 *
3370 * 3. Flow denied -- This value indicates that a flow was
3371 * denied.
3372 *
3373 * >1000. Values above 1000 represent various reasons why a
3374 * flow was denied.
3375 *
3376 * 1001. A flow was denied by an ingress ACL.
3377 *
3378 * 1002. A flow was denied by an egress ACL.
3379 *
3380 * 1003. The ASA denied an attempt to connect to the (ASA's)
3381 * interface service.
3382 *
3383 * 1004. The flow was denied because the first packet on the
3384 * TCP was not a TCP SYN packet.
3385 *
3386 * 5. Flow updated -- This value indicates that a flow update
3387 * timer went off or a flow was torn down.
3388 *
3389 * The IPFIX values for the firewallEvent IE follow those for
3390 * NF_F_FW_EVENT (with IPFIX providing no explanation as to what
3391 * the values mean! --- some standard) and IPFIX adds the value:
3392 *
3393 * 4. Flow alert.
3394 *
3395 * PROCESSING RULES:
3396 *
3397 * The term "ignore" below means that a log message is written
3398 * and that no SiLK flow record is created.
3399 *
3400 * Ignore flow records where the "flow ignore" event is present.
3401 *
3402 * Treat records where "flow deleted" is specified as actual flow
3403 * records to be processed and stored.
3404 *
3405 * Ignore "flow created" events, since we will handle these flows
3406 * when the "flow deleted" event occurs. Also, a short-lived
3407 * flow record may produce a "flow deleted" event without a "flow
3408 * created" event.
3409 *
3410 * For a "flow denied" event, write a special value into the SiLK
3411 * Flow record that the writing thread can use to categorize the
3412 * record as innull/outnull.
3413 *
3414 * It is unclear how to handle "flow updated" events. If the
3415 * record is only being updated, presumably SiLK will get a "flow
3416 * deleted" event in the future. However, if the flow is being
3417 * torn down, will the ASA send a separate "flow deleted" event?
3418 * For now (as of SiLK 3.8.0), ignore "flow updated" events.
3419 *
3420 * Ignore "flow alert" events.
3421 *
3422 *
3423 * Firewall events, byte and packet counts, and the Cisco ASA:
3424 *
3425 * 1. Flow created events have a byte and packet count of 0;
3426 * this is fine since we are ignoring these flows.
3427 *
3428 * 2. Flow deinied events have a byte and packet count of 0.
3429 * SiLK will ignore these flows unless we doctor them to have a
3430 * non-zero byte and packet count, which we do when the ASA hack
3431 * is enabled.
3432 *
3433 * 3. Flow deleted events have a packet count of 0, but we have
3434 * code below to work around that when the ASA hack is enabled.
3435 * The flows usally have a non-zero byte count. However, some
3436 * flow records have a 0-byte count, and (July 2015) we have been
3437 * told one source of these records are packets to an un-opened
3438 * port. Previouly these flows were ignored, but as of SiLK
3439 * 3.11.0 we doctor the records to have a byte count of 1.
3440 */
3441 if ((skpcProbeGetQuirks(probe) & SKPC_QUIRK_FW_EVENT)
3442 && (record->bmap & (TMPL_BIT_firewallEvent | TMPL_BIT_NF_F_FW_EVENT
3443 | TMPL_BIT_NF_F_FW_EXT_EVENT)))
3444 {
3445 char msg[64];
3446 uint8_t event = (fixrec->firewallEvent
3447 ? fixrec->firewallEvent : fixrec->NF_F_FW_EVENT);
3448 if (SKIPFIX_FW_EVENT_DELETED == event) {
3449 /* flow deleted */
3450 TRACEMSG(1,(("Processing flow deleted event as actual flow record;"
3451 " firewallEvent=%u, NF_F_FW_EVENT=%u,"
3452 " NF_F_FW_EXT_EVENT=%u"),
3453 fixrec->firewallEvent, fixrec->NF_F_FW_EVENT,
3454 fixrec->NF_F_FW_EXT_EVENT));
3455 /* these normally have a byte count, but not always */
3456 if (0 == bytes) {
3457 if (0 == pkts) {
3458 TRACEMSG(1, ("Setting forward bytes and packets to 1"
3459 " for deleted firewall event"));
3460 bytes = 1;
3461 pkts = 1;
3462 } else {
3463 TRACEMSG(1, ("Setting forward bytes equal to packets value"
3464 " for deleted firewall event"));
3465 bytes = pkts;
3466 }
3467 } else {
3468 /* there is a forward byte count */
3469 if (0 == pkts) {
3470 TRACEMSG(1, ("Setting forward packets to 1"));
3471 pkts = 1;
3472 }
3473 if (rev_bytes) {
3474 /* there is a reverse byte count */
3475 if (0 == rev_pkts) {
3476 TRACEMSG(1, ("Setting reverse packets to 1"));
3477 rev_pkts = 1;
3478 }
3479 }
3480 }
3481
3482 } else if (SKIPFIX_FW_EVENT_DENIED == event) {
3483 /* flow denied */
3484 TRACEMSG(1, (("Processing flow denied event as actual flow record;"
3485 " firewallEvent=%u, NF_F_FW_EVENT=%u,"
3486 " NF_F_FW_EXT_EVENT=%u"),
3487 fixrec->firewallEvent, fixrec->NF_F_FW_EVENT,
3488 fixrec->NF_F_FW_EXT_EVENT));
3489 if (SKIPFIX_FW_EVENT_DENIED_CHECK_VALID(fixrec->NF_F_FW_EXT_EVENT))
3490 {
3491 rwRecSetMemo(fwd_rec, fixrec->NF_F_FW_EXT_EVENT);
3492 } else {
3493 rwRecSetMemo(fwd_rec, event);
3494 }
3495 /* flow denied events from the Cisco ASA have zero in the
3496 * bytes and packets field */
3497 if (0 == pkts) {
3498 TRACEMSG(1, ("Setting forward bytes and packets to 1"
3499 " for denied firewall event"));
3500 bytes = 1;
3501 pkts = 1;
3502 } else if (0 == bytes) {
3503 TRACEMSG(1, ("Setting forward bytes equal to packets value"
3504 " for denied firewall event"));
3505 bytes = pkts;
3506 }
3507
3508 } else {
3509 /* flow created, flow updated, flow alert, or something
3510 * unexpected */
3511 if (skpcProbeGetLogFlags(probe) & SOURCE_LOG_FIREWALL) {
3512 snprintf(msg, sizeof(msg), "firewallEvent=%u,extended=%u",
3513 event, fixrec->NF_F_FW_EXT_EVENT);
3514 ski_fixrec_ignore(fixrec, msg);
3515 }
3516 return 0;
3517 }
3518 }
3519
3520 /* FIXME. What if the record has a flowDirection field that is
3521 * set to egress (0x01)? Shouldn't we handle that by reversing
3522 * the record? Or has fixbuf done that for us? */
3523
3524 if (0 == bytes && 0 == rev_bytes) {
3525 #if 0
3526 /* flow denied events from the Cisco ASA have zero in the
3527 * bytes and packets field */
3528 if ((skpcProbeGetQuirks(probe) & SKPC_QUIRK_FW_EVENT)
3529 && 0 == pkts
3530 && (SKIPFIX_FW_EVENT_DENIED == fixrec->NF_F_FW_EVENT
3531 || SKIPFIX_FW_EVENT_DENIED == fixrec->firewallEvent))
3532 {
3533 TRACEMSG(1, ("Setting forward bytes and packets to 1"
3534 " for denied firewall event"));
3535 bytes = 1;
3536 pkts = 1;
3537 } else
3538 #endif /* 0 */
3539 {
3540 ski_fixrec_ignore(fixrec, "no forward/reverse octets");
3541 return 0;
3542 }
3543 }
3544
3545 if (0 == pkts && 0 == rev_pkts) {
3546 if ((skpcProbeGetQuirks(probe) & SKPC_QUIRK_ZERO_PACKETS) == 0) {
3547 /* Ignore records with no volume. */
3548 ski_fixrec_ignore(fixrec, "no forward/reverse packets");
3549 return 0;
3550 }
3551
3552 /* attempt to handle NetFlowV9 records from an ASA router that
3553 * have no packet count. The code assumes all records from an
3554 * ASA have a byte count, though this is not always true. */
3555 if (bytes) {
3556 /* there is a forward byte count */
3557 if (0 == pkts) {
3558 TRACEMSG(1, ("Setting forward packets to 1"));
3559 pkts = 1;
3560 }
3561 }
3562 if (rev_bytes) {
3563 /* there is a reverse byte count */
3564 if (0 == rev_pkts) {
3565 TRACEMSG(1, ("Setting reverse packets to 1"));
3566 rev_pkts = 1;
3567 }
3568 }
3569 }
3570
3571 /* If the TCP flags are in a subTemplateMultiList, copy them from
3572 * the list and into the record. The fixbuf.stml gets initialized
3573 * by the call to fBufNext().*/
3574 stml = NULL;
3575 while ((stml = fbSubTemplateMultiListGetNextEntry(&fixrec->stml, stml))) {
3576 if (SKI_TCP_STML_TID != stml->tmplID) {
3577 fbSubTemplateMultiListEntryNextDataPtr(stml, NULL);
3578 } else {
3579 ski_tcp_stml_t *tcp = NULL;
3580 tcp = ((ski_tcp_stml_t*)
3581 fbSubTemplateMultiListEntryNextDataPtr(stml, tcp));
3582 fixrec->initialTCPFlags = tcp->initialTCPFlags;
3583 fixrec->unionTCPFlags = tcp->unionTCPFlags;
3584 fixrec->reverseInitialTCPFlags = tcp->reverseInitialTCPFlags;
3585 fixrec->reverseUnionTCPFlags = tcp->reverseUnionTCPFlags;
3586 have_tcp_stml = 1;
3587 }
3588 }
3589 fbSubTemplateMultiListClear(&fixrec->stml);
3590
3591 if (pkts && bytes) {
3592 /* We have forward information. */
3593 TRACEMSG(1, ("Read a forward fixrec record"));
3594
3595 /* Handle the IP addresses */
3596 #if SK_ENABLE_IPV6
3597 /* Use the IPv6 addresses if they are present and either there
3598 * are no IPv4 addresses or the IPv6 addresses are non-zero. */
3599 if ((record->bmap & TMPL_BIT_sourceIPv6Address)
3600 && (!(record->bmap & TMPL_BIT_sourceIPv4Address)
3601 || !SK_IPV6_IS_ZERO(fixrec->sourceIPv6Address)
3602 || !SK_IPV6_IS_ZERO(fixrec->destinationIPv6Address)))
3603 {
3604 /* Values found in IPv6 addresses--use them */
3605 rwRecSetIPv6(fwd_rec);
3606 rwRecMemSetSIPv6(fwd_rec, &fixrec->sourceIPv6Address);
3607 rwRecMemSetDIPv6(fwd_rec, &fixrec->destinationIPv6Address);
3608 rwRecMemSetNhIPv6(fwd_rec, &fixrec->ipNextHopIPv6Address);
3609 } else
3610 #endif /* SK_ENABLE_IPV6 */
3611 {
3612 /* Take values from IPv4 */
3613 rwRecSetSIPv4(fwd_rec, fixrec->sourceIPv4Address);
3614 rwRecSetDIPv4(fwd_rec, fixrec->destinationIPv4Address);
3615 rwRecSetNhIPv4(fwd_rec, fixrec->ipNextHopIPv4Address);
3616 }
3617
3618 /* Handle the Protocol and Ports */
3619 rwRecSetProto(fwd_rec, fixrec->protocolIdentifier);
3620
3621 if (!rwRecIsICMP(fwd_rec)
3622 || (!(record->bmap & (TMPL_BIT_icmpTypeCodeIPv4
3623 | TMPL_BIT_icmpTypeIPv4))))
3624 {
3625 rwRecSetSPort(fwd_rec, fixrec->sourceTransportPort);
3626 rwRecSetDPort(fwd_rec, fixrec->destinationTransportPort);
3627
3628 } else if (record->bmap & TMPL_BIT_icmpTypeCodeIPv4) {
3629 rwRecSetSPort(fwd_rec, 0);
3630 #if SK_ENABLE_IPV6
3631 if (rwRecIsIPv6(fwd_rec)) {
3632 rwRecSetDPort(fwd_rec, fixrec->icmpTypeCodeIPv6);
3633 } else
3634 #endif /* SK_ENABLE_IPV6 */
3635 {
3636 rwRecSetDPort(fwd_rec, fixrec->icmpTypeCodeIPv4);
3637 }
3638
3639 } else if (record->bmap & TMPL_BIT_icmpTypeIPv4) {
3640 /* record has at least one of: icmpTypeIPv4 icmpCodeIPv4,
3641 * icmpTypeIPv6, icmpCodeIPv6 */
3642 rwRecSetSPort(fwd_rec, 0);
3643 #if SK_ENABLE_IPV6
3644 if (rwRecIsIPv6(fwd_rec)) {
3645 rwRecSetDPort(fwd_rec, ((fixrec->icmpTypeIPv6 << 8)
3646 | fixrec->icmpCodeIPv6));
3647 } else
3648 #endif /* SK_ENABLE_IPV6 */
3649 {
3650 rwRecSetDPort(fwd_rec, ((fixrec->icmpTypeIPv4 << 8)
3651 | fixrec->icmpCodeIPv4));
3652 }
3653 } else {
3654 skAbort();
3655 }
3656
3657 /* Handle the SNMP or VLAN interfaces */
3658 if (SKPC_IFVALUE_SNMP == skpcProbeGetInterfaceValueType(probe)) {
3659 rwRecSetInput(fwd_rec, CLAMP_VAL16(fixrec->ingressInterface));
3660 rwRecSetOutput(fwd_rec, CLAMP_VAL16(fixrec->egressInterface));
3661 } else {
3662 rwRecSetInput(fwd_rec, fixrec->vlanId);
3663 rwRecSetOutput(fwd_rec, fixrec->postVlanId);
3664 }
3665
3666 /* Store volume, clamping counts to 32 bits. */
3667 rwRecSetPkts(fwd_rec, CLAMP_VAL32(pkts));
3668 rwRecSetBytes(fwd_rec, CLAMP_VAL32(bytes));
3669
3670 } else if (rev_pkts && rev_bytes) {
3671 /* We have no forward information, only reverse. Write the
3672 * source and dest values from the IPFIX record to SiLK's dest
3673 * and source fields, respectively. */
3674 TRACEMSG(1, ("Read a reverse-only fixrec record"));
3675
3676 /* Store volume, clamping counts to 32 bits. */
3677 rwRecSetPkts(fwd_rec, CLAMP_VAL32(rev_pkts));
3678 rwRecSetBytes(fwd_rec, CLAMP_VAL32(rev_bytes));
3679
3680 /* This cannot be a bi-flow. Clear rev_pkts and rev_bytes
3681 * variables now. We check this in the rev_rec code
3682 * below. */
3683 rev_pkts = rev_bytes = 0;
3684
3685 /* Handle the IP addresses */
3686 #if SK_ENABLE_IPV6
3687 if ((record->bmap & TMPL_BIT_sourceIPv6Address)
3688 && (!(record->bmap & TMPL_BIT_sourceIPv4Address)
3689 || !SK_IPV6_IS_ZERO(fixrec->sourceIPv6Address)
3690 || !SK_IPV6_IS_ZERO(fixrec->destinationIPv6Address)))
3691 {
3692 /* Values found in IPv6 addresses--use them */
3693 rwRecSetIPv6(fwd_rec);
3694 rwRecMemSetSIPv6(fwd_rec, &fixrec->destinationIPv6Address);
3695 rwRecMemSetDIPv6(fwd_rec, &fixrec->sourceIPv6Address);
3696 rwRecMemSetNhIPv6(fwd_rec, &fixrec->ipNextHopIPv6Address);
3697 } else
3698 #endif /* SK_ENABLE_IPV6 */
3699 {
3700 /* Take values from IPv4 */
3701 rwRecSetSIPv4(fwd_rec, fixrec->destinationIPv4Address);
3702 rwRecSetDIPv4(fwd_rec, fixrec->sourceIPv4Address);
3703 rwRecSetNhIPv4(fwd_rec, fixrec->ipNextHopIPv4Address);
3704 }
3705
3706 /* Handle the Protocol and Ports */
3707 rwRecSetProto(fwd_rec, fixrec->protocolIdentifier);
3708 if (!rwRecIsICMP(fwd_rec)) {
3709 rwRecSetSPort(fwd_rec, fixrec->destinationTransportPort);
3710 rwRecSetDPort(fwd_rec, fixrec->sourceTransportPort);
3711 } else if (record->bmap & TMPL_BIT_icmpTypeCodeIPv4) {
3712 rwRecSetSPort(fwd_rec, 0);
3713 #if SK_ENABLE_IPV6
3714 if (rwRecIsIPv6(fwd_rec)) {
3715 rwRecSetDPort(fwd_rec, fixrec->icmpTypeCodeIPv6);
3716 } else
3717 #endif /* SK_ENABLE_IPV6 */
3718 {
3719 rwRecSetDPort(fwd_rec, fixrec->icmpTypeCodeIPv4);
3720 }
3721 } else if (record->bmap & TMPL_BIT_icmpTypeIPv4) {
3722 /* record has at least one of: icmpTypeIPv4 icmpCodeIPv4,
3723 * icmpTypeIPv6, icmpCodeIPv6 */
3724 rwRecSetSPort(fwd_rec, 0);
3725 #if SK_ENABLE_IPV6
3726 if (rwRecIsIPv6(fwd_rec)) {
3727 rwRecSetDPort(fwd_rec, ((fixrec->icmpTypeIPv6 << 8)
3728 | fixrec->icmpCodeIPv6));
3729 } else
3730 #endif /* SK_ENABLE_IPV6 */
3731 {
3732 rwRecSetDPort(fwd_rec, ((fixrec->icmpTypeIPv4 << 8)
3733 | fixrec->icmpCodeIPv4));
3734 }
3735 } else {
3736 /* For an ICMP record, put whichever Port field is
3737 * non-zero into the record's dPort field */
3738 rwRecSetSPort(fwd_rec, 0);
3739 rwRecSetDPort(fwd_rec, (fixrec->destinationTransportPort
3740 ? fixrec->destinationTransportPort
3741 : fixrec->sourceTransportPort));
3742 }
3743
3744 /* Handle the SNMP or VLAN interfaces */
3745 if (SKPC_IFVALUE_SNMP == skpcProbeGetInterfaceValueType(probe)) {
3746 rwRecSetInput(fwd_rec, CLAMP_VAL16(fixrec->egressInterface));
3747 rwRecSetOutput(fwd_rec, CLAMP_VAL16(fixrec->ingressInterface));
3748 } else {
3749 if (record->bmap & TMPL_BIT_reverseVlanId) {
3750 /* If we have the reverse elements, use them */
3751 rwRecSetInput(fwd_rec, fixrec->reverseVlanId);
3752 rwRecSetOutput(fwd_rec, fixrec->reversePostVlanId);
3753 } else if (record->bmap & TMPL_BIT_postVlanId) {
3754 /* If we have a single vlanId, set 'input' to that value;
3755 * otherwise, set 'input' to postVlanId and 'output' to
3756 * vlanId. */
3757 rwRecSetInput(fwd_rec, fixrec->postVlanId);
3758 rwRecSetOutput(fwd_rec, fixrec->vlanId);
3759 } else {
3760 /* we have a single vlanId, so don't swap the values */
3761 rwRecSetInput(fwd_rec, fixrec->vlanId);
3762 }
3763 }
3764
3765 } else {
3766 TRACEMSG(2, (("Found zero bytes or packets; byte=%" PRIu64 ", pkt="
3767 "%" PRIu64 ", rev_byte=%" PRIu64 ", rev_pkt=%" PRIu64),
3768 bytes, pkts, rev_bytes, rev_pkts));
3769 ski_fixrec_ignore(fixrec, "byte or packet count is zero");
3770 return 0;
3771 }
3772
3773 skiGauntletOfTime(fbuf, record, probe);
3774
3775 /* Copy the remainder of the record */
3776 rwRecSetFlowType(fwd_rec, fixrec->silkFlowType);
3777 rwRecSetSensor(fwd_rec, fixrec->silkFlowSensor);
3778 rwRecSetApplication(fwd_rec, fixrec->silkAppLabel);
3779
3780 tcp_state = fixrec->silkTCPState;
3781 tcp_flags = (fixrec->initialTCPFlags | fixrec->unionTCPFlags);
3782
3783 /* Ensure the SK_TCPSTATE_EXPANDED bit is properly set. */
3784 if (tcp_flags && IPPROTO_TCP == rwRecGetProto(fwd_rec)) {
3785 /* Flow is TCP and init|session flags had a value. */
3786 rwRecSetFlags(fwd_rec, tcp_flags);
3787 rwRecSetInitFlags(fwd_rec, fixrec->initialTCPFlags);
3788 rwRecSetRestFlags(fwd_rec, fixrec->unionTCPFlags);
3789 tcp_state |= SK_TCPSTATE_EXPANDED;
3790 } else {
3791 /* clear bit when not TCP or no separate init/session flags */
3792 tcp_state &= ~SK_TCPSTATE_EXPANDED;
3793 /* use whatever all-flags we were given; leave initial-flags
3794 * and session-flags unset */
3795 rwRecSetFlags(fwd_rec, fixrec->tcpControlBits);
3796 }
3797
3798 /* Process the flowEndReason and flowAttributes unless one of
3799 * those bits is already set (via silkTCPState). */
3800 if (!(tcp_state
3801 & (SK_TCPSTATE_FIN_FOLLOWED_NOT_ACK | SK_TCPSTATE_TIMEOUT_KILLED
3802 | SK_TCPSTATE_TIMEOUT_STARTED | SK_TCPSTATE_UNIFORM_PACKET_SIZE)))
3803 {
3804 /* Note active timeout */
3805 if ((fixrec->flowEndReason & SKI_END_MASK) == SKI_END_ACTIVE) {
3806 tcp_state |= SK_TCPSTATE_TIMEOUT_KILLED;
3807 }
3808 /* Note continuation */
3809 if (fixrec->flowEndReason & SKI_END_ISCONT) {
3810 tcp_state |= SK_TCPSTATE_TIMEOUT_STARTED;
3811 }
3812 /* Note flows with records of uniform size */
3813 if (fixrec->flowAttributes & SKI_FLOW_ATTRIBUTE_UNIFORM_PACKET_SIZE) {
3814 tcp_state |= SK_TCPSTATE_UNIFORM_PACKET_SIZE;
3815 }
3816 rwRecSetTcpState(fwd_rec, tcp_state);
3817 }
3818
3819 rwRecSetTcpState(fwd_rec, tcp_state);
3820
3821 /* Handle the reverse record if there is one in the IPFIX record,
3822 * which is indicated by the value of 'rev_bytes'.*/
3823 if (0 == rev_bytes) {
3824 /* No data for reverse direction; just clear the record. */
3825 RWREC_CLEAR(record->rev_rec);
3826 } else {
3827 rwRec *rev_rec;
3828 rev_rec = record->rev_rec;
3829
3830 /* We have data for reverse direction. */
3831 TRACEMSG(1, ("Handling reverse side of bi-flow fixrec record"));
3832
3833 #define COPY_FORWARD_REC_TO_REVERSE 1
3834 #if COPY_FORWARD_REC_TO_REVERSE
3835 /* Initialize the reverse record with the forward
3836 * record */
3837 RWREC_COPY(rev_rec, fwd_rec);
3838 #else
3839 /* instead of copying the forward record and changing
3840 * nearly everything, we could just set these fields on
3841 * the reverse record. */
3842 rwRecSetProto(rev_rec, fixrec->protocolIdentifier);
3843 rwRecSetFlowType(rev_rec, fixrec->silkFlowType);
3844 rwRecSetSensor(rev_rec, fixrec->silkFlowSensor);
3845 rwRecSetTcpState(rev_rec, fixrec->silkTCPState);
3846 rwRecSetApplication(rev_rec, fixrec->silkAppLabel);
3847 /* does using the forward nexthop IP for the reverse
3848 * record make any sense? Shouldn't we check for a
3849 * reverse next hop address? */
3850 #if SK_ENABLE_IPV6
3851 if (rwRecIsIPv6(fwd_rec)) {
3852 rwRecSetIPv6(rev_rec);
3853 rwRecMemSetNhIPv6(rev_rec, &fixrec->ipNextHopIPv6Address);
3854 } else
3855 #endif
3856 {
3857 rwRecSetNhIPv4(rev_rec, &fixrec->ipNextHopIPv4Address);
3858 }
3859 #endif /* #else clause of #if COPY_FORWARD_REC_TO_REVERSE */
3860
3861 /* Reverse the IPs */
3862 #if SK_ENABLE_IPV6
3863 if (rwRecIsIPv6(fwd_rec)) {
3864 rwRecMemSetSIPv6(rev_rec, &fixrec->destinationIPv6Address);
3865 rwRecMemSetDIPv6(rev_rec, &fixrec->sourceIPv6Address);
3866 } else
3867 #endif
3868 {
3869 rwRecSetSIPv4(rev_rec, fixrec->destinationIPv4Address);
3870 rwRecSetDIPv4(rev_rec, fixrec->sourceIPv4Address);
3871 }
3872
3873 /* Reverse the ports unless this is an ICMP record */
3874 if (!rwRecIsICMP(fwd_rec)) {
3875 rwRecSetSPort(rev_rec, rwRecGetDPort(fwd_rec));
3876 rwRecSetDPort(rev_rec, rwRecGetSPort(fwd_rec));
3877 }
3878
3879 /* Reverse the SNMP or VLAN interfaces */
3880 if (SKPC_IFVALUE_SNMP == skpcProbeGetInterfaceValueType(probe)) {
3881 rwRecSetInput(rev_rec, rwRecGetOutput(fwd_rec));
3882 rwRecSetOutput(rev_rec, rwRecGetInput(fwd_rec));
3883 } else if (record->bmap & TMPL_BIT_reverseVlanId) {
3884 /* Reverse VLAN values exist. Use them */
3885 rwRecSetInput(rev_rec, fixrec->reverseVlanId);
3886 rwRecSetOutput(rev_rec, fixrec->reversePostVlanId);
3887 } else if (record->bmap & TMPL_BIT_postVlanId) {
3888 /* Reverse the forward values */
3889 rwRecSetInput(rev_rec, fixrec->postVlanId);
3890 rwRecSetOutput(rev_rec, fixrec->vlanId);
3891 } else {
3892 /* we have a single vlanId, so don't swap the values */
3893 rwRecSetInput(rev_rec, fixrec->vlanId);
3894 }
3895
3896 /* Set volume. We retrieved them above */
3897 rwRecSetPkts(rev_rec, CLAMP_VAL32(rev_pkts));
3898 rwRecSetBytes(rev_rec, CLAMP_VAL32(rev_bytes));
3899
3900 /* Calculate reverse start time from reverse RTT */
3901
3902 /* Reverse flow's start time must be increased and its
3903 * duration decreased by its offset from the forward
3904 * record */
3905 rwRecSetStartTime(rev_rec, (rwRecGetStartTime(fwd_rec)
3906 + fixrec->reverseFlowDeltaMilliseconds));
3907 rwRecSetElapsed(rev_rec, (rwRecGetElapsed(fwd_rec)
3908 - fixrec->reverseFlowDeltaMilliseconds));
3909
3910 /* Note: the value of the 'tcp_state' variable from above is
3911 * what is in rwRecGetTcpState(rev_rec). */
3912
3913 /* Get reverse TCP flags from the IPFIX record if they are
3914 * available. Otherwise, leave the flags unchanged (using
3915 * those from the forward direction). */
3916 tcp_flags = (fixrec->reverseInitialTCPFlags
3917 | fixrec->reverseUnionTCPFlags);
3918
3919 if (tcp_flags && IPPROTO_TCP == rwRecGetProto(fwd_rec)) {
3920 /* Flow is TCP and init|session has a value. */
3921 TRACEMSG(2, ("Using reverse TCP flags (initial|session)"));
3922 rwRecSetFlags(rev_rec, tcp_flags);
3923 rwRecSetInitFlags(rev_rec, fixrec->reverseInitialTCPFlags);
3924 rwRecSetRestFlags(rev_rec, fixrec->reverseUnionTCPFlags);
3925 tcp_state |= SK_TCPSTATE_EXPANDED;
3926 } else if (record->bmap & TMPL_BIT_reverseTcpControlBits) {
3927 /* Use whatever is in all-flags; clear any init/session
3928 * flags we got from the forward rec. */
3929 TRACEMSG(2, ("Using reverse TCP flags (all only)"));
3930 rwRecSetFlags(rev_rec, fixrec->reverseTcpControlBits);
3931 rwRecSetInitFlags(rev_rec, 0);
3932 rwRecSetRestFlags(rev_rec, 0);
3933 tcp_state &= ~SK_TCPSTATE_EXPANDED;
3934 } else if (have_tcp_stml
3935 || (record->bmap & TMPL_BIT_reverseInitialTCPFlags))
3936 {
3937 /* If a reverseInitialTCPFlags Element existed on the
3938 * template; use it even though its value is 0. */
3939 TRACEMSG(2, ("Setting all TCP flags to 0"));
3940 rwRecSetFlags(rev_rec, 0);
3941 rwRecSetInitFlags(rev_rec, 0);
3942 rwRecSetRestFlags(rev_rec, 0);
3943 tcp_state &= ~SK_TCPSTATE_EXPANDED;
3944 }
3945 /* else leave the flags unchanged */
3946
3947 /* Handle reverse flow attributes */
3948 if (fixrec->reverseFlowAttributes
3949 & SKI_FLOW_ATTRIBUTE_UNIFORM_PACKET_SIZE)
3950 {
3951 /* ensure it is set */
3952 tcp_state |= SK_TCPSTATE_UNIFORM_PACKET_SIZE;
3953 } else {
3954 /* ensure it it not set */
3955 tcp_state &= ~SK_TCPSTATE_UNIFORM_PACKET_SIZE;
3956 }
3957
3958 rwRecSetTcpState(rev_rec, tcp_state);
3959 }
3960
3961
3962 /* all done */
3963 return ((rev_bytes > 0) ? 2 : 1);
3964 }
3965
3966
3967
3968
3969 /**
3970 * Print a log message saying why a ski_yafrec_t record was ignored
3971 */
3972 static void
ski_yafrec_ignore(const ski_yafrec_t * yafrec,const char * reason)3973 ski_yafrec_ignore(
3974 const ski_yafrec_t *yafrec,
3975 const char *reason)
3976 {
3977 skipaddr_t ipaddr;
3978 char sipbuf[64];
3979 char dipbuf[64];
3980
3981 if (!SK_IPV6_IS_ZERO(yafrec->sourceIPv6Address)) {
3982 #ifdef SK_HAVE_INET_NTOP
3983 if (!inet_ntop(AF_INET6, &yafrec->sourceIPv6Address,
3984 sipbuf, sizeof(sipbuf)))
3985 #endif
3986 {
3987 strcpy(sipbuf, "unknown-v6");
3988 }
3989 } else {
3990 skipaddrSetV4(&ipaddr, &yafrec->sourceIPv4Address);
3991 skipaddrString(sipbuf, &ipaddr, SKIPADDR_CANONICAL);
3992 }
3993 if (!SK_IPV6_IS_ZERO(yafrec->destinationIPv6Address)) {
3994 #ifdef SK_HAVE_INET_NTOP
3995 if (!inet_ntop(AF_INET6, &yafrec->destinationIPv6Address,
3996 dipbuf, sizeof(dipbuf)))
3997 #endif
3998 {
3999 strcpy(dipbuf, "unknown-v6");
4000 }
4001 } else {
4002 skipaddrSetV4(&ipaddr, &yafrec->destinationIPv4Address);
4003 skipaddrString(dipbuf, &ipaddr, SKIPADDR_CANONICAL);
4004 }
4005
4006 INFOMSG(("IGNORED|%s|%s|%u|%u|%u|%" PRIu64 "|%" PRIu64 "|%s|"),
4007 sipbuf, dipbuf, yafrec->sourceTransportPort,
4008 yafrec->destinationTransportPort,yafrec->protocolIdentifier,
4009 yafrec->packetDeltaCount, yafrec->octetDeltaCount, reason);
4010 }
4011
4012
4013 /**
4014 * Call fBufNext() and transcode the data into one of the
4015 * ski_yafrec_spec templates, and then convert the structure into
4016 * 0, 1, or 2 SiLK Flow records and fill the record pointers on the
4017 * 'record' structure. The return value indicates the number of
4018 * records converted. Return -1 on failure.
4019 */
4020 static int
ski_yafrec_next(fBuf_t * fbuf,ski_record_t * record,const skpc_probe_t * probe,GError ** err)4021 ski_yafrec_next(
4022 fBuf_t *fbuf,
4023 ski_record_t *record,
4024 const skpc_probe_t *probe,
4025 GError **err)
4026 {
4027 fbSubTemplateMultiListEntry_t *stml;
4028 ski_yafrec_t *yafrec;
4029 size_t len;
4030 uint8_t tcp_state;
4031 int have_tcp_stml = 0;
4032 uint16_t int_tid;
4033 rwRec *fwd_rec;
4034 rwRec *rev_rec;
4035
4036 DEFINE_PREFIX_BUF(prefix);
4037 makeTracemsgPrefix(prefix, sizeof(prefix), skpcProbeGetName(probe),
4038 fbSessionGetDomain(fBufGetSession(fbuf)),
4039 record->tid, record->tmpl);
4040 TRACEMSG(2, (("%s bmap " BMAP_PRI ", read by ski_yafrec_next()"),
4041 prefix, record->bmap));
4042 assert(SKI_RECTYPE_YAFREC == record->rectype);
4043
4044 /* Get a local handle to the record and clear it */
4045 fwd_rec = record->fwd_rec;
4046 RWREC_CLEAR(fwd_rec);
4047
4048 /* The lower 16 bits of the context is the TID of the template to
4049 * use to read the record. */
4050 int_tid = record->bmap & UINT16_MAX;
4051 if ((int_tid & SKI_YAFREC_TID) != SKI_YAFREC_TID) {
4052 TRACEMSG(1, ("ski_yafrec_next() called but TID %#06x does not match",
4053 int_tid));
4054 return ski_ignore_next(fbuf, record, probe, err);
4055 }
4056 if (!fBufSetInternalTemplate(fbuf, int_tid, err)) {
4057 TRACEMSG(1, (("ski_yafrec_next() called but setting Template"
4058 " TID %#06x failed: %s"), int_tid, (*err)->message));
4059 g_clear_error(err);
4060 return ski_ignore_next(fbuf, record, probe, err);
4061 }
4062 len = sizeof(record->data.yafrec);
4063 if (!fBufNext(fbuf, (uint8_t *)&record->data.yafrec, &len, err)) {
4064 return -1;
4065 }
4066 yafrec = &record->data.yafrec;
4067 assert((record->bmap & YAFREC_STML)
4068 ? (len == sizeof(ski_yafrec_t))
4069 : (len == offsetof(ski_yafrec_t, stml)));
4070
4071 if ((yafrec->flowEndReason & SKI_END_MASK)== SKI_END_YAF_INTERMEDIATE_FLOW)
4072 {
4073 TRACEMSG(2, ("Ignored YAF intermediate uniflow"));
4074 return 0;
4075 }
4076
4077 /* Ignore records that do not have IPv4 addresses when SiLK was
4078 * built without IPv6 support. */
4079 #if !SK_ENABLE_IPV6
4080 if (record->bmap & YAFREC_ONLY_IP6) {
4081 ski_yafrec_ignore(yafrec, "IPv6 record");
4082 return 0;
4083 }
4084 #endif /* SK_ENABLE_IPV6 */
4085
4086 /* Volume */
4087 if (yafrec->packetDeltaCount && yafrec->octetDeltaCount) {
4088 /* Store volume, clamping counts to 32 bits. */
4089 rwRecSetPkts(fwd_rec, CLAMP_VAL32(yafrec->packetDeltaCount));
4090 rwRecSetBytes(fwd_rec, CLAMP_VAL32(yafrec->octetDeltaCount));
4091
4092 if (yafrec->reversePacketDeltaCount && yafrec->reverseOctetDeltaCount){
4093 TRACEMSG(1, ("Read a bi-flow yafrec record"));
4094 RWREC_CLEAR(record->rev_rec);
4095 rev_rec = record->rev_rec;
4096 rwRecSetPkts(rev_rec,CLAMP_VAL32(yafrec->reversePacketDeltaCount));
4097 rwRecSetBytes(rev_rec,CLAMP_VAL32(yafrec->reverseOctetDeltaCount));
4098 } else {
4099 TRACEMSG(1, ("Read a forward yafrec record"));
4100 rev_rec = NULL;
4101 }
4102 } else if (yafrec->reversePacketDeltaCount
4103 && yafrec->reverseOctetDeltaCount)
4104 {
4105 /* We have no forward information, only reverse. Swap the IP
4106 * addresses, the ports, the SNMP interfaces, and the VLAN IDs
4107 * in the yafrec */
4108 ski_yafrec_t reversed;
4109
4110 TRACEMSG(1, ("Read a reverse-only yafrec record"));
4111 rev_rec = NULL;
4112
4113 memcpy(reversed.sourceIPv6Address, yafrec->destinationIPv6Address,
4114 sizeof(yafrec->sourceIPv6Address));
4115 memcpy(yafrec->destinationIPv6Address, yafrec->sourceIPv6Address,
4116 sizeof(yafrec->sourceIPv6Address));
4117 memcpy(yafrec->sourceIPv6Address, reversed.sourceIPv6Address,
4118 sizeof(yafrec->sourceIPv6Address));
4119
4120 reversed.sourceIPv4Address = yafrec->destinationIPv4Address;
4121 yafrec->destinationIPv4Address = yafrec->sourceIPv4Address;
4122 yafrec->sourceIPv4Address = reversed.sourceIPv4Address;
4123
4124 reversed.sourceTransportPort = yafrec->destinationTransportPort;
4125 yafrec->destinationTransportPort = yafrec->sourceTransportPort;
4126 yafrec->sourceTransportPort = reversed.sourceTransportPort;
4127
4128 reversed.ingressInterface = yafrec->egressInterface;
4129 yafrec->egressInterface = yafrec->ingressInterface;
4130 yafrec->ingressInterface = reversed.ingressInterface;
4131
4132 if (yafrec->reverseVlanId) {
4133 yafrec->vlanId = yafrec->reverseVlanId;
4134 }
4135
4136 rwRecSetPkts(fwd_rec, CLAMP_VAL32(yafrec->reversePacketDeltaCount));
4137 rwRecSetBytes(fwd_rec, CLAMP_VAL32(yafrec->reverseOctetDeltaCount));
4138
4139 } else {
4140 TRACEMSG(2, (("Found zero bytes or packets; byte=%" PRIu64 ", pkt="
4141 "%" PRIu64 ", rev_byte=%" PRIu64 ", rev_pkt=%" PRIu64),
4142 yafrec->octetDeltaCount, yafrec->packetDeltaCount,
4143 yafrec->reverseOctetDeltaCount,
4144 yafrec->reversePacketDeltaCount));
4145 ski_yafrec_ignore(yafrec, "byte or packet count is zero");
4146 return 0;
4147 }
4148
4149 /* Simple fields */
4150 rwRecSetProto(fwd_rec, yafrec->protocolIdentifier);
4151 rwRecSetFlowType(fwd_rec, yafrec->silkFlowType);
4152 rwRecSetSensor(fwd_rec, yafrec->silkFlowSensor);
4153 rwRecSetApplication(fwd_rec, yafrec->silkAppLabel);
4154 if (rev_rec) {
4155 rwRecSetProto(rev_rec, yafrec->protocolIdentifier);
4156 rwRecSetFlowType(rev_rec, yafrec->silkFlowType);
4157 rwRecSetSensor(rev_rec, yafrec->silkFlowSensor);
4158 rwRecSetApplication(rev_rec, yafrec->silkAppLabel);
4159 }
4160
4161 /* Time stamp */
4162 rwRecSetStartTime(fwd_rec, (sktime_t)yafrec->flowStartMilliseconds);
4163 if (yafrec->flowEndMilliseconds < yafrec->flowStartMilliseconds) {
4164 rwRecSetElapsed(fwd_rec, 0);
4165 } else if ((yafrec->flowEndMilliseconds - yafrec->flowStartMilliseconds)
4166 > UINT32_MAX)
4167 {
4168 rwRecSetElapsed(fwd_rec, UINT32_MAX);
4169 } else {
4170 rwRecSetElapsed(fwd_rec, (yafrec->flowEndMilliseconds
4171 - yafrec->flowStartMilliseconds));
4172 }
4173 if (skpcProbeGetLogFlags(probe) & SOURCE_LOG_TIMESTAMPS) {
4174 char stime_buf[SKTIMESTAMP_STRLEN];
4175 sktimestamp_r(stime_buf, rwRecGetStartTime(fwd_rec), SKTIMESTAMP_UTC);
4176 INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
4177 " flowStartMilliseconds=%" PRIu64
4178 ", flowEndMilliseconds=%" PRIu64),
4179 skpcProbeGetName(probe), stime_buf,
4180 (double)rwRecGetElapsed(fwd_rec)/1000,
4181 yafrec->flowStartMilliseconds, yafrec->flowEndMilliseconds);
4182 }
4183
4184 if (rev_rec) {
4185 /* Reverse flow's start time must be increased and its
4186 * duration decreased by its offset from the forward
4187 * record */
4188 rwRecSetStartTime(rev_rec, (rwRecGetStartTime(fwd_rec)
4189 + yafrec->reverseFlowDeltaMilliseconds));
4190 if (rwRecGetElapsed(fwd_rec) < yafrec->reverseFlowDeltaMilliseconds) {
4191 rwRecSetElapsed(rev_rec, 0);
4192 } else {
4193 rwRecSetElapsed(rev_rec, (rwRecGetElapsed(fwd_rec)
4194 - yafrec->reverseFlowDeltaMilliseconds));
4195 }
4196 }
4197
4198 /* IP Addresses */
4199 #if SK_ENABLE_IPV6
4200 /* Use the IPv6 addresses if they are the only ones present or
4201 * both addresses are present and at least one of the IPv6
4202 * addresses is non-zero. */
4203 if ((record->bmap & YAFREC_ONLY_IP6)
4204 || ((record->bmap & YAFREC_IP_BOTH)
4205 && !(SK_IPV6_IS_ZERO(yafrec->sourceIPv6Address)
4206 && SK_IPV6_IS_ZERO(yafrec->destinationIPv6Address))))
4207 {
4208 /* Values found in IPv6 addresses--use them */
4209 rwRecSetIPv6(fwd_rec);
4210 rwRecMemSetSIPv6(fwd_rec, &yafrec->sourceIPv6Address);
4211 rwRecMemSetDIPv6(fwd_rec, &yafrec->destinationIPv6Address);
4212 rwRecMemSetNhIPv6(fwd_rec, &yafrec->ipNextHopIPv6Address);
4213 if (rev_rec) {
4214 rwRecSetIPv6(rev_rec);
4215 rwRecMemSetSIPv6(rev_rec, &yafrec->destinationIPv6Address);
4216 rwRecMemSetDIPv6(rev_rec, &yafrec->sourceIPv6Address);
4217 rwRecMemSetNhIPv6(rev_rec, &yafrec->ipNextHopIPv6Address);
4218 }
4219 } else
4220 #endif /* SK_ENABLE_IPV6 */
4221 {
4222 /* Take values from IPv4 */
4223 rwRecSetSIPv4(fwd_rec, yafrec->sourceIPv4Address);
4224 rwRecSetDIPv4(fwd_rec, yafrec->destinationIPv4Address);
4225 rwRecSetNhIPv4(fwd_rec, yafrec->ipNextHopIPv4Address);
4226 if (rev_rec) {
4227 rwRecSetSIPv4(rev_rec, yafrec->destinationIPv4Address);
4228 rwRecSetDIPv4(rev_rec, yafrec->sourceIPv4Address);
4229 rwRecSetNhIPv4(rev_rec, yafrec->ipNextHopIPv4Address);
4230 }
4231 }
4232
4233 /* SNMP or VLAN interfaces */
4234 if (SKPC_IFVALUE_SNMP == skpcProbeGetInterfaceValueType(probe)) {
4235 rwRecSetInput(fwd_rec, CLAMP_VAL16(yafrec->ingressInterface));
4236 rwRecSetOutput(fwd_rec, CLAMP_VAL16(yafrec->egressInterface));
4237 if (rev_rec) {
4238 rwRecSetInput(rev_rec, CLAMP_VAL16(yafrec->egressInterface));
4239 rwRecSetOutput(rev_rec, CLAMP_VAL16(yafrec->ingressInterface));
4240 }
4241 } else {
4242 rwRecSetInput(fwd_rec, yafrec->vlanId);
4243 rwRecSetOutput(fwd_rec, 0);
4244 if (rev_rec) {
4245 if (record->bmap & TMPL_BIT_reverseVlanId) {
4246 /* Reverse VLAN value exists. Use it */
4247 rwRecSetInput(rev_rec, yafrec->reverseVlanId);
4248 rwRecSetOutput(rev_rec, 0);
4249 } else {
4250 /* we have a single vlanId, so don't swap the values */
4251 rwRecSetInput(rev_rec, yafrec->vlanId);
4252 rwRecSetOutput(rev_rec, 0);
4253 }
4254 }
4255 }
4256
4257 /* Attributes, ICMP Type/Code, Ports, TCP Flags */
4258 tcp_state = yafrec->silkTCPState;
4259
4260 /* Process the flowEndReason and flowAttributes unless one of
4261 * those bits is already set (via silkTCPState). */
4262 if (!(tcp_state
4263 & (SK_TCPSTATE_FIN_FOLLOWED_NOT_ACK | SK_TCPSTATE_TIMEOUT_KILLED
4264 | SK_TCPSTATE_TIMEOUT_STARTED | SK_TCPSTATE_UNIFORM_PACKET_SIZE)))
4265 {
4266 /* Note active timeout */
4267 if ((yafrec->flowEndReason & SKI_END_MASK) == SKI_END_ACTIVE) {
4268 tcp_state |= SK_TCPSTATE_TIMEOUT_KILLED;
4269 }
4270 /* Note continuation */
4271 if (yafrec->flowEndReason & SKI_END_ISCONT) {
4272 tcp_state |= SK_TCPSTATE_TIMEOUT_STARTED;
4273 }
4274 /* Note flows with records of uniform size */
4275 if (yafrec->flowAttributes & SKI_FLOW_ATTRIBUTE_UNIFORM_PACKET_SIZE) {
4276 tcp_state |= SK_TCPSTATE_UNIFORM_PACKET_SIZE;
4277 }
4278 }
4279
4280 if (IPPROTO_TCP != yafrec->protocolIdentifier) {
4281 /* Free STML list memory */
4282 if (record->bmap & YAFREC_STML) {
4283 fbSubTemplateMultiListClear(&yafrec->stml);
4284 }
4285
4286 /* For TCP Flags, use whatever value was given in
4287 * tcpControlBits; ensure expanded bit in tcp_state is off. */
4288 rwRecSetFlags(fwd_rec, yafrec->tcpControlBits);
4289 tcp_state &= ~SK_TCPSTATE_EXPANDED;
4290 rwRecSetTcpState(fwd_rec, tcp_state);
4291
4292 if (rev_rec) {
4293 /* Use reverse value if given; else foward value */
4294 if (record->bmap & TMPL_BIT_reverseTcpControlBits) {
4295 rwRecSetFlags(rev_rec, yafrec->reverseTcpControlBits);
4296 } else {
4297 rwRecSetFlags(rev_rec, yafrec->tcpControlBits);
4298 }
4299
4300 /* Handle reverse flow attributes */
4301 if (yafrec->reverseFlowAttributes
4302 & SKI_FLOW_ATTRIBUTE_UNIFORM_PACKET_SIZE)
4303 {
4304 /* ensure it is set */
4305 tcp_state |= SK_TCPSTATE_UNIFORM_PACKET_SIZE;
4306 } else {
4307 /* ensure it it not set */
4308 tcp_state &= ~SK_TCPSTATE_UNIFORM_PACKET_SIZE;
4309 }
4310 rwRecSetTcpState(rev_rec, tcp_state);
4311 }
4312
4313 if (!rwRecIsICMP(fwd_rec)) {
4314 /* Use whatever values are in sport and dport */
4315 rwRecSetSPort(fwd_rec, yafrec->sourceTransportPort);
4316 rwRecSetDPort(fwd_rec, yafrec->destinationTransportPort);
4317 if (rev_rec) {
4318 rwRecSetSPort(rev_rec, yafrec->destinationTransportPort);
4319 rwRecSetDPort(rev_rec, yafrec->sourceTransportPort);
4320 }
4321 } else {
4322 /* ICMP Record */
4323 /* Store ((icmpType << 8) | icmpCode) in the dPort */
4324 rwRecSetSPort(fwd_rec, 0);
4325 if (record->bmap & TMPL_BIT_icmpTypeCodeIPv4) {
4326 rwRecSetDPort(fwd_rec, yafrec->icmpTypeCode);
4327 } else {
4328 rwRecSetDPort(fwd_rec, yafrec->destinationTransportPort);
4329 }
4330
4331 if (rev_rec) {
4332 /* use the same sPort and dPort values */
4333 rwRecSetSPort(rev_rec, 0);
4334 rwRecSetDPort(rev_rec, rwRecGetDPort(fwd_rec));
4335 }
4336 }
4337 } else {
4338 /* Record is TCP */
4339 rwRecSetSPort(fwd_rec, yafrec->sourceTransportPort);
4340 rwRecSetDPort(fwd_rec, yafrec->destinationTransportPort);
4341 if (rev_rec) {
4342 rwRecSetSPort(rev_rec, yafrec->destinationTransportPort);
4343 rwRecSetDPort(rev_rec, yafrec->sourceTransportPort);
4344 }
4345
4346 if (record->bmap & YAFREC_STML) {
4347 /* The TCP flags are in a subTemplateMultiList, copy them
4348 * from the list to the record. The yafrec->stml gets
4349 * initialized by the call to fBufNext().*/
4350 stml = NULL;
4351 while ((stml = fbSubTemplateMultiListGetNextEntry(&yafrec->stml,
4352 stml)))
4353 {
4354 if (SKI_TCP_STML_TID != stml->tmplID) {
4355 fbSubTemplateMultiListEntryNextDataPtr(stml, NULL);
4356 } else {
4357 ski_tcp_stml_t *tcp = NULL;
4358 tcp = ((ski_tcp_stml_t*)
4359 fbSubTemplateMultiListEntryNextDataPtr(stml, tcp));
4360 yafrec->initialTCPFlags = tcp->initialTCPFlags;
4361 yafrec->unionTCPFlags = tcp->unionTCPFlags;
4362 yafrec->reverseInitialTCPFlags
4363 = tcp->reverseInitialTCPFlags;
4364 yafrec->reverseUnionTCPFlags = tcp->reverseUnionTCPFlags;
4365 have_tcp_stml = 1;
4366 }
4367 }
4368 fbSubTemplateMultiListClear(&yafrec->stml);
4369 }
4370
4371 if (yafrec->initialTCPFlags | yafrec->unionTCPFlags) {
4372 rwRecSetInitFlags(fwd_rec, yafrec->initialTCPFlags);
4373 rwRecSetRestFlags(fwd_rec, yafrec->unionTCPFlags);
4374 rwRecSetFlags(fwd_rec, (yafrec->initialTCPFlags
4375 | yafrec->unionTCPFlags));
4376 tcp_state |= SK_TCPSTATE_EXPANDED;
4377 } else {
4378 rwRecSetFlags(fwd_rec, yafrec->tcpControlBits);
4379 tcp_state &= ~SK_TCPSTATE_EXPANDED;
4380 }
4381 rwRecSetTcpState(fwd_rec, tcp_state);
4382
4383 if (rev_rec) {
4384 /* Get reverse TCP flags from the IPFIX record if they are
4385 * available. Otherwise, use those from the forward
4386 * direction. */
4387 if (yafrec->reverseInitialTCPFlags | yafrec->reverseUnionTCPFlags){
4388 rwRecSetInitFlags(rev_rec, yafrec->reverseInitialTCPFlags);
4389 rwRecSetRestFlags(rev_rec, yafrec->reverseUnionTCPFlags);
4390 rwRecSetFlags(rev_rec, (yafrec->reverseInitialTCPFlags
4391 | yafrec->reverseUnionTCPFlags));
4392 tcp_state |= SK_TCPSTATE_EXPANDED;
4393
4394 } else if (record->bmap & TMPL_BIT_reverseTcpControlBits) {
4395 /* Use whatever is in all-flags; clear any init/session
4396 * flags we got from the forward fwd_rec. */
4397 TRACEMSG(2, ("Using reverse TCP flags (all only)"));
4398 rwRecSetFlags(rev_rec, yafrec->reverseTcpControlBits);
4399 rwRecSetInitFlags(rev_rec, 0);
4400 rwRecSetRestFlags(rev_rec, 0);
4401 tcp_state &= ~SK_TCPSTATE_EXPANDED;
4402 } else if (have_tcp_stml
4403 || (record->bmap & TMPL_BIT_reverseInitialTCPFlags))
4404 {
4405 /* If a reverseInitialTCPFlags Element existed on the
4406 * template; use it even though its value is 0. */
4407 TRACEMSG(2, ("Setting all TCP flags to 0"));
4408 rwRecSetFlags(rev_rec, 0);
4409 rwRecSetInitFlags(rev_rec, 0);
4410 rwRecSetRestFlags(rev_rec, 0);
4411 tcp_state &= ~SK_TCPSTATE_EXPANDED;
4412 } else {
4413 /* Use foward flags */
4414 rwRecSetInitFlags(rev_rec, rwRecGetInitFlags(fwd_rec));
4415 rwRecSetRestFlags(rev_rec, rwRecGetRestFlags(fwd_rec));
4416 rwRecSetFlags(rev_rec, rwRecGetFlags(fwd_rec));
4417 }
4418
4419 /* Handle reverse flow attributes */
4420 if (yafrec->reverseFlowAttributes
4421 & SKI_FLOW_ATTRIBUTE_UNIFORM_PACKET_SIZE)
4422 {
4423 /* ensure it is set */
4424 tcp_state |= SK_TCPSTATE_UNIFORM_PACKET_SIZE;
4425 } else {
4426 /* ensure it it not set */
4427 tcp_state &= ~SK_TCPSTATE_UNIFORM_PACKET_SIZE;
4428 }
4429 rwRecSetTcpState(rev_rec, tcp_state);
4430 }
4431 }
4432
4433 /* all done */
4434 return ((rev_rec) ? 2 : 1);
4435 }
4436
4437
4438 /**
4439 * Print a log message saying why a ski_nf9rec_t record was ignored
4440 */
4441 static void
ski_nf9rec_ignore(const ski_record_t * record,const char * reason)4442 ski_nf9rec_ignore(
4443 const ski_record_t *record,
4444 const char *reason)
4445 {
4446 const ski_nf9rec_t *nf9rec = &record->data.nf9rec;
4447 skipaddr_t ipaddr;
4448 char sipbuf[64];
4449 char dipbuf[64];
4450
4451 if (record->bmap & NF9REC_IP6) {
4452 #ifdef SK_HAVE_INET_NTOP
4453 if (!inet_ntop(AF_INET6, &nf9rec->addr.ip6.sourceIPv6Address,
4454 sipbuf, sizeof(sipbuf)))
4455 #endif
4456 {
4457 strcpy(sipbuf, "unknown-v6");
4458 }
4459 #ifdef SK_HAVE_INET_NTOP
4460 if (!inet_ntop(AF_INET6, &nf9rec->addr.ip6.destinationIPv6Address,
4461 dipbuf, sizeof(dipbuf)))
4462 #endif
4463 {
4464 strcpy(dipbuf, "unknown-v6");
4465 }
4466 } else {
4467 skipaddrSetV4(&ipaddr, &nf9rec->addr.ip4.sourceIPv4Address);
4468 skipaddrString(sipbuf, &ipaddr, SKIPADDR_CANONICAL);
4469 skipaddrSetV4(&ipaddr, &nf9rec->addr.ip4.destinationIPv4Address);
4470 skipaddrString(dipbuf, &ipaddr, SKIPADDR_CANONICAL);
4471 }
4472
4473 INFOMSG(("IGNORED|%s|%s|%u|%u|%u|%" PRIu64 "|%" PRIu64 "|%s|"),
4474 sipbuf, dipbuf, nf9rec->sourceTransportPort,
4475 nf9rec->destinationTransportPort,nf9rec->protocolIdentifier,
4476 nf9rec->packetDeltaCount, nf9rec->octetDeltaCount, reason);
4477 }
4478
4479
4480 /**
4481 * Call fBufNext() and transcode the data into one of the
4482 * ski_nf9rec_spec templates, and then convert the structure into
4483 * 0, 1, or 2 SiLK Flow records and fill the record pointers on the
4484 * 'record' structure. The return value indicates the number of
4485 * records converted. Return -1 on failure.
4486 */
4487 static int
ski_nf9rec_next(fBuf_t * fbuf,ski_record_t * record,const skpc_probe_t * probe,GError ** err)4488 ski_nf9rec_next(
4489 fBuf_t *fbuf,
4490 ski_record_t *record,
4491 const skpc_probe_t *probe,
4492 GError **err)
4493 {
4494 char stime_buf[SKTIMESTAMP_STRLEN];
4495 ski_nf9rec_t *nf9rec;
4496 size_t len;
4497 uint16_t int_tid;
4498 rwRec *fwd_rec;
4499 rwRec *rev_rec;
4500
4501 DEFINE_PREFIX_BUF(prefix);
4502 makeTracemsgPrefix(prefix, sizeof(prefix), skpcProbeGetName(probe),
4503 fbSessionGetDomain(fBufGetSession(fbuf)),
4504 record->tid, record->tmpl);
4505 TRACEMSG(2, (("%s bmap " BMAP_PRI ", read by ski_nf9rec_next()"),
4506 prefix, record->bmap));
4507 assert(SKI_RECTYPE_NF9REC == record->rectype);
4508
4509 /* Get a local handle to the record and clear it */
4510 fwd_rec = record->fwd_rec;
4511 RWREC_CLEAR(fwd_rec);
4512 rev_rec = NULL;
4513
4514 /* The lower 16 bits of the context is the TID of the template to
4515 * use to read the record. */
4516 int_tid = record->bmap & UINT16_MAX;
4517 if ((int_tid & SKI_NF9REC_TID) != SKI_NF9REC_TID) {
4518 TRACEMSG(1, ("ski_nf9rec_next() called but TID %#06x does not match",
4519 int_tid));
4520 return ski_ignore_next(fbuf, record, probe, err);
4521 }
4522 if (!fBufSetInternalTemplate(fbuf, int_tid, err)) {
4523 TRACEMSG(1, (("ski_nf9rec_next() called but setting Template"
4524 " TID %#06x failed: %s"), int_tid, (*err)->message));
4525 g_clear_error(err);
4526 return ski_ignore_next(fbuf, record, probe, err);
4527 }
4528 len = sizeof(record->data.nf9rec);
4529 if (!fBufNext(fbuf, (uint8_t *)&record->data.nf9rec, &len, err)) {
4530 return -1;
4531 }
4532 assert(len == sizeof(ski_nf9rec_t));
4533 nf9rec = &record->data.nf9rec;
4534
4535 /* Ignore records that do not have IPv4 addresses when SiLK was
4536 * built without IPv6 support. */
4537 #if !SK_ENABLE_IPV6
4538 if (record->bmap & NF9REC_IP6) {
4539 ski_nf9rec_ignore(record, "IPv6 record");
4540 return 0;
4541 }
4542 #endif /* SK_ENABLE_IPV6 */
4543
4544 /* When the nf9-out-is-reverse quirk is set, flip a bit on the
4545 * record's bitmap so volume is treated as initiator/responder. */
4546 if (skpcProbeGetQuirks(probe) & SKPC_QUIRK_NF9_OUT_IS_REVERSE) {
4547 TRACEMSG(2, (("Modifying record bmap from " BMAP_PRI " to " BMAP_PRI
4548 " due to nf9-out-is-reverse"),
4549 record->bmap, record->bmap | NF9REC_INITIATOR));
4550 record->bmap |= NF9REC_INITIATOR;
4551 }
4552
4553 /* Handle the firewall settings and check for reverse (responder)
4554 * volume. See the big comment in ski_fixrec_next() for all the
4555 * gory details on firewall rules. */
4556 if (record->bmap & (TMPL_BIT_firewallEvent | TMPL_BIT_NF_F_FW_EVENT
4557 | TMPL_BIT_NF_F_FW_EXT_EVENT))
4558 {
4559 /* Handle firewall events */
4560 char msg[64];
4561 uint8_t event = (nf9rec->firewallEvent
4562 ? nf9rec->firewallEvent : nf9rec->NF_F_FW_EVENT);
4563 if (SKIPFIX_FW_EVENT_DENIED == event) {
4564 /* flow denied; there should be no reverse record */
4565 TRACEMSG(1, (("Processing flow denied event as actual flow record;"
4566 " firewallEvent=%u, NF_F_FW_EVENT=%u,"
4567 " NF_F_FW_EXT_EVENT=%u"),
4568 nf9rec->firewallEvent, nf9rec->NF_F_FW_EVENT,
4569 nf9rec->NF_F_FW_EXT_EVENT));
4570 if (SKIPFIX_FW_EVENT_DENIED_CHECK_VALID(nf9rec->NF_F_FW_EXT_EVENT))
4571 {
4572 rwRecSetMemo(fwd_rec, nf9rec->NF_F_FW_EXT_EVENT);
4573 } else {
4574 rwRecSetMemo(fwd_rec, event);
4575 }
4576 /* flow denied events from the Cisco ASA typically have
4577 * zero in the bytes and packets field */
4578 if (nf9rec->octetDeltaCount) {
4579 rwRecSetBytes(fwd_rec, CLAMP_VAL32(nf9rec->octetDeltaCount));
4580 if (nf9rec->packetDeltaCount) {
4581 rwRecSetPkts(
4582 fwd_rec, CLAMP_VAL32(nf9rec->packetDeltaCount));
4583 } else {
4584 TRACEMSG(1, ("Setting forward packets to 1"
4585 " for denied firewall event"));
4586 rwRecSetPkts(fwd_rec, 1);
4587 }
4588 } else if (nf9rec->postOctetDeltaCount
4589 && !(record->bmap & NF9REC_INITIATOR))
4590 {
4591 /* postOctet value is non-zero and it is not
4592 * responderOctets; use in place of standard value */
4593 rwRecSetBytes(fwd_rec,
4594 CLAMP_VAL32(nf9rec->postOctetDeltaCount));
4595 if (nf9rec->postPacketDeltaCount) {
4596 rwRecSetPkts(fwd_rec,
4597 CLAMP_VAL32(nf9rec->postPacketDeltaCount));
4598 } else {
4599 TRACEMSG(1, ("Setting forward packets to 1 for denied"
4600 " firewall event (postOctets non-zero)"));
4601 rwRecSetPkts(fwd_rec, 1);
4602 }
4603 } else if (nf9rec->packetDeltaCount) {
4604 TRACEMSG(1, ("Setting forward bytes equal to packets value"
4605 " for denied firewall event"));
4606 rwRecSetBytes(fwd_rec, CLAMP_VAL32(nf9rec->packetDeltaCount));
4607 rwRecSetPkts(fwd_rec, CLAMP_VAL32(nf9rec->packetDeltaCount));
4608 } else {
4609 TRACEMSG(1, ("Setting forward bytes and packets to 1"
4610 " for denied firewall event"));
4611 rwRecSetBytes(fwd_rec, 1);
4612 rwRecSetPkts(fwd_rec, 1);
4613 }
4614
4615 } else if (SKIPFIX_FW_EVENT_DELETED != event) {
4616 /* flow created, flow updated, flow alert, or something
4617 * unexpected. These are ignored */
4618 if (skpcProbeGetLogFlags(probe) & SOURCE_LOG_FIREWALL) {
4619 snprintf(msg, sizeof(msg), "firewallEvent=%u,extended=%u",
4620 event, nf9rec->NF_F_FW_EXT_EVENT);
4621 ski_nf9rec_ignore(record, msg);
4622 }
4623 return 0;
4624 } else {
4625 /* flow deleted */
4626 TRACEMSG(1,(("Processing flow deleted event as actual flow record;"
4627 " firewallEvent=%u, NF_F_FW_EVENT=%u,"
4628 " NF_F_FW_EXT_EVENT=%u"),
4629 nf9rec->firewallEvent, nf9rec->NF_F_FW_EVENT,
4630 nf9rec->NF_F_FW_EXT_EVENT));
4631 /* these normally have a byte count, but not always */
4632 if (nf9rec->octetDeltaCount) {
4633 rwRecSetBytes(fwd_rec, CLAMP_VAL32(nf9rec->octetDeltaCount));
4634 if (nf9rec->packetDeltaCount) {
4635 rwRecSetPkts(
4636 fwd_rec, CLAMP_VAL32(nf9rec->packetDeltaCount));
4637 } else {
4638 TRACEMSG(1, ("Setting forward packets to 1"
4639 " for deleted firewall event"));
4640 rwRecSetPkts(fwd_rec, 1);
4641 }
4642 } else if (nf9rec->postOctetDeltaCount
4643 && !(record->bmap & NF9REC_INITIATOR))
4644 {
4645 /* postOctet value is non-zero and it is not
4646 * responderOctets; use in place of standard value */
4647 rwRecSetBytes(fwd_rec,
4648 CLAMP_VAL32(nf9rec->postOctetDeltaCount));
4649 if (nf9rec->postPacketDeltaCount) {
4650 rwRecSetPkts(fwd_rec,
4651 CLAMP_VAL32(nf9rec->postPacketDeltaCount));
4652 } else {
4653 TRACEMSG(1, ("Setting forward packets to 1 for deleted"
4654 " firewall event (postOctets non-zero)"));
4655 rwRecSetPkts(fwd_rec, 1);
4656 }
4657 } else if (nf9rec->packetDeltaCount) {
4658 TRACEMSG(1, ("Setting forward bytes equal to packets value"
4659 " for deleted firewall event"));
4660 rwRecSetBytes(fwd_rec, CLAMP_VAL32(nf9rec->packetDeltaCount));
4661 rwRecSetPkts(fwd_rec, CLAMP_VAL32(nf9rec->packetDeltaCount));
4662 } else {
4663 TRACEMSG(1, ("Setting forward bytes and packets to 1"
4664 " for deleted firewall event"));
4665 rwRecSetBytes(fwd_rec, 1);
4666 rwRecSetPkts(fwd_rec, 1);
4667 }
4668
4669 /* handle reverse record */
4670 if (!(record->bmap & NF9REC_INITIATOR)) {
4671 /* There is no reverse data */
4672 } else if (nf9rec->postOctetDeltaCount) {
4673 /* there is a reverse byte count: postOctet and
4674 * postPacket members hold responder values */
4675 RWREC_CLEAR(record->rev_rec);
4676 rev_rec = record->rev_rec;
4677 rwRecSetBytes(
4678 rev_rec, CLAMP_VAL32(nf9rec->postOctetDeltaCount));
4679 if (nf9rec->postPacketDeltaCount) {
4680 rwRecSetPkts(
4681 rev_rec, CLAMP_VAL32(nf9rec->postPacketDeltaCount));
4682 } else {
4683 TRACEMSG(1, ("Setting reverse packets to 1"
4684 " for deleted firewall event"));
4685 rwRecSetPkts(rev_rec, 1);
4686 }
4687 } else if (nf9rec->postPacketDeltaCount) {
4688 /* there is a reverse packet count */
4689 RWREC_CLEAR(record->rev_rec);
4690 rev_rec = record->rev_rec;
4691 TRACEMSG(1, ("Setting reverse bytes equal to packets value"
4692 " for deleted firewall event"));
4693 rwRecSetBytes(
4694 rev_rec, CLAMP_VAL32(nf9rec->postPacketDeltaCount));
4695 rwRecSetPkts(
4696 rev_rec, CLAMP_VAL32(nf9rec->postPacketDeltaCount));
4697 }
4698 /* else no reverse record */
4699 }
4700 } else if (!(record->bmap & NF9REC_INITIATOR)) {
4701 /* there is no firewall event data and no reverse data; set
4702 * forward data */
4703 if (nf9rec->octetDeltaCount) {
4704 /* use the forward octet count which is non-zero */
4705 if (nf9rec->packetDeltaCount) {
4706 rwRecSetBytes(fwd_rec, CLAMP_VAL32(nf9rec->octetDeltaCount));
4707 rwRecSetPkts(fwd_rec, CLAMP_VAL32(nf9rec->packetDeltaCount));
4708 } else if (skpcProbeGetQuirks(probe) & SKPC_QUIRK_ZERO_PACKETS) {
4709 TRACEMSG(1, ("Setting forward packets to 1"
4710 " outside of firewall event handler"));
4711 rwRecSetBytes(fwd_rec, CLAMP_VAL32(nf9rec->octetDeltaCount));
4712 rwRecSetPkts(fwd_rec, 1);
4713 } else {
4714 ski_nf9rec_ignore(record, "No forward packets");
4715 return 0;
4716 }
4717 } else if (nf9rec->postOctetDeltaCount) {
4718 /* postOctet value is non-zero and it is not
4719 * responderOctets; use in place of standard value */
4720 if (nf9rec->postPacketDeltaCount) {
4721 rwRecSetBytes(fwd_rec,
4722 CLAMP_VAL32(nf9rec->postOctetDeltaCount));
4723 rwRecSetPkts(fwd_rec,
4724 CLAMP_VAL32(nf9rec->postPacketDeltaCount));
4725 } else if (skpcProbeGetQuirks(probe) & SKPC_QUIRK_ZERO_PACKETS) {
4726 TRACEMSG(1, ("Setting forward packets to 1"
4727 " outside of firewall event handler"));
4728 rwRecSetBytes(fwd_rec,
4729 CLAMP_VAL32(nf9rec->postOctetDeltaCount));
4730 rwRecSetPkts(fwd_rec, 1);
4731 } else {
4732 ski_nf9rec_ignore(record, "No forward packets");
4733 return 0;
4734 }
4735 } else {
4736 ski_nf9rec_ignore(record, "No forward octets");
4737 return 0;
4738 }
4739 } else if (nf9rec->octetDeltaCount) {
4740 /* the template included initiatorOctets & responderOctets and
4741 * there is forward volume */
4742 if (nf9rec->packetDeltaCount) {
4743 rwRecSetBytes(fwd_rec, CLAMP_VAL32(nf9rec->octetDeltaCount));
4744 rwRecSetPkts(fwd_rec, CLAMP_VAL32(nf9rec->packetDeltaCount));
4745 } else if (skpcProbeGetQuirks(probe) & SKPC_QUIRK_ZERO_PACKETS) {
4746 TRACEMSG(1, ("Setting forward packets to 1"
4747 " outside of firewall event handler"));
4748 rwRecSetBytes(fwd_rec, CLAMP_VAL32(nf9rec->octetDeltaCount));
4749 rwRecSetPkts(fwd_rec, 1);
4750 } else {
4751 ski_nf9rec_ignore(record, "No forward packets");
4752 return 0;
4753 }
4754 if (nf9rec->postOctetDeltaCount) {
4755 /* there is a reverse byte count (responderOctets) */
4756 if (nf9rec->postPacketDeltaCount) {
4757 RWREC_CLEAR(record->rev_rec);
4758 rev_rec = record->rev_rec;
4759 rwRecSetBytes(
4760 rev_rec, CLAMP_VAL32(nf9rec->postOctetDeltaCount));
4761 rwRecSetPkts(
4762 rev_rec, CLAMP_VAL32(nf9rec->postPacketDeltaCount));
4763 } else if (skpcProbeGetQuirks(probe) & SKPC_QUIRK_ZERO_PACKETS) {
4764 RWREC_CLEAR(record->rev_rec);
4765 rev_rec = record->rev_rec;
4766 TRACEMSG(1, ("Setting reverse packets to 1"
4767 " outside of firewall event handler"));
4768 rwRecSetBytes(
4769 rev_rec, CLAMP_VAL32(nf9rec->postOctetDeltaCount));
4770 rwRecSetPkts(rev_rec, 1);
4771 } else {
4772 TRACEMSG(
4773 1, ("Ignoring reverse bytes since no reverse packets"));
4774 }
4775 }
4776 } else if (nf9rec->postOctetDeltaCount) {
4777 /* reverse only record */
4778 ski_nf9rec_ignore(record,
4779 "No forward octets (reverse octets are non-zero)");
4780 return 0;
4781 } else {
4782 ski_nf9rec_ignore(record, "No forward/reverse octets");
4783 return 0;
4784 }
4785
4786 TRACEMSG(1, ("Read a %s nf9rec record", ((rev_rec)?"bi-flow":"forward")));
4787
4788 /* Handle the IP addresses */
4789 #if SK_ENABLE_IPV6
4790 if (record->bmap & NF9REC_IP6) {
4791 rwRecSetIPv6(fwd_rec);
4792 rwRecMemSetSIPv6(fwd_rec, nf9rec->addr.ip6.sourceIPv6Address);
4793 rwRecMemSetDIPv6(fwd_rec, nf9rec->addr.ip6.destinationIPv6Address);
4794 rwRecMemSetNhIPv6(fwd_rec, nf9rec->addr.ip6.ipNextHopIPv6Address);
4795 if (rev_rec) {
4796 rwRecSetIPv6(rev_rec);
4797 rwRecMemSetSIPv6(rev_rec, &nf9rec->addr.ip6.destinationIPv6Address);
4798 rwRecMemSetDIPv6(rev_rec, &nf9rec->addr.ip6.sourceIPv6Address);
4799 rwRecMemSetNhIPv6(rev_rec, &nf9rec->addr.ip6.ipNextHopIPv6Address);
4800 }
4801 } else
4802 #endif /* SK_ENABLE_IPV6 */
4803 {
4804 /* Take values from IPv4 */
4805 rwRecSetSIPv4(fwd_rec, nf9rec->addr.ip4.sourceIPv4Address);
4806 rwRecSetDIPv4(fwd_rec, nf9rec->addr.ip4.destinationIPv4Address);
4807 rwRecSetNhIPv4(fwd_rec, nf9rec->addr.ip4.ipNextHopIPv4Address);
4808 if (rev_rec) {
4809 rwRecSetSIPv4(rev_rec, nf9rec->addr.ip4.destinationIPv4Address);
4810 rwRecSetDIPv4(rev_rec, nf9rec->addr.ip4.sourceIPv4Address);
4811 rwRecSetNhIPv4(rev_rec, nf9rec->addr.ip4.ipNextHopIPv4Address);
4812 }
4813 }
4814
4815 /* Time stamp */
4816 if (record->bmap & NF9REC_MILLI) {
4817 if (0 == nf9rec->t.milli.flowStartMilliseconds) {
4818 rwRecSetStartTime(
4819 fwd_rec, (sktime_t)nf9rec->t.milli.flowEndMilliseconds);
4820 rwRecSetElapsed(fwd_rec, 0);
4821 } else {
4822 int64_t dur = (nf9rec->t.milli.flowEndMilliseconds
4823 - nf9rec->t.milli.flowStartMilliseconds);
4824 rwRecSetStartTime(
4825 fwd_rec, (sktime_t)nf9rec->t.milli.flowStartMilliseconds);
4826 if (dur < 0) {
4827 rwRecSetElapsed(fwd_rec, 0);
4828 } else if (dur > (int64_t)UINT32_MAX) {
4829 rwRecSetElapsed(fwd_rec, UINT32_MAX);
4830 } else {
4831 rwRecSetElapsed(fwd_rec, (uint32_t)dur);
4832 }
4833 }
4834 if (skpcProbeGetLogFlags(probe) & SOURCE_LOG_TIMESTAMPS) {
4835 sktimestamp_r(stime_buf, rwRecGetStartTime(fwd_rec),
4836 SKTIMESTAMP_UTC);
4837 INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
4838 " flowStartMilliseconds=%" PRIu64
4839 ", flowEndMilliseconds=%" PRIu64),
4840 skpcProbeGetName(probe), stime_buf,
4841 (double)rwRecGetElapsed(fwd_rec)/1000,
4842 nf9rec->t.milli.flowStartMilliseconds,
4843 nf9rec->t.milli.flowEndMilliseconds);
4844 }
4845 } else {
4846 /* Times based on flow generator system uptimes (Netflow v9) */
4847 intmax_t uptime, difference;
4848 sktime_t export_msec;
4849 const char *rollover_first;
4850 const char *rollover_last = "";
4851
4852 assert(record->bmap & NF9REC_SYSUP);
4853
4854 /* Compute the uptime: systemInitTimeMilliseconds is the
4855 * absolute router boot time (msec), and libfixbuf sets it by
4856 * subtracting the NFv9 uptime (msec) from the record's
4857 * absolute export time (sec). */
4858 export_msec = sktimeCreate(fBufGetExportTime(fbuf), 0);
4859 uptime = export_msec - nf9rec->t.sysup.systemInitTimeMilliseconds;
4860 if (skpcProbeGetQuirks(probe) & SKPC_QUIRK_NF9_SYSUPTIME_SECS) {
4861 /* uptime was reported in seconds, not msec */
4862 TRACEMSG(2, (("Before adjustment: exportTimeMillisec %" PRIu64
4863 ", initTimeMillisec %" PRIu64 ", uptime %" PRIdMAX
4864 ", startUpTime %" PRIu32 ", endUpTime %" PRIu32
4865 ", packets %" PRIu32),
4866 export_msec,
4867 nf9rec->t.sysup.systemInitTimeMilliseconds,
4868 uptime, nf9rec->t.sysup.flowStartSysUpTime,
4869 nf9rec->t.sysup.flowEndSysUpTime,
4870 rwRecGetPkts(fwd_rec)));
4871 uptime *= 1000;
4872 nf9rec->t.sysup.systemInitTimeMilliseconds = export_msec - uptime;
4873 if (rwRecGetPkts(fwd_rec) == 1
4874 && (nf9rec->t.sysup.flowEndSysUpTime
4875 < nf9rec->t.sysup.flowStartSysUpTime))
4876 {
4877 /* sometimes the end time for single packet flows is
4878 * very different than the start time. */
4879 nf9rec->t.sysup.flowEndSysUpTime
4880 = nf9rec->t.sysup.flowStartSysUpTime;
4881 }
4882 }
4883
4884 /* Compute duration */
4885 if (nf9rec->t.sysup.flowStartSysUpTime
4886 <= nf9rec->t.sysup.flowEndSysUpTime)
4887 {
4888 rwRecSetElapsed(fwd_rec, (nf9rec->t.sysup.flowEndSysUpTime
4889 - nf9rec->t.sysup.flowStartSysUpTime));
4890 } else {
4891 /* assume EndTime rolled-over and start did not */
4892 rwRecSetElapsed(
4893 fwd_rec, (ROLLOVER32 + nf9rec->t.sysup.flowEndSysUpTime
4894 - nf9rec->t.sysup.flowStartSysUpTime));
4895 rollover_last = ", assume flowEndSysUpTime rollover";
4896 }
4897 /* Compute uptime, checking for rollover */
4898 difference = uptime - nf9rec->t.sysup.flowStartSysUpTime;
4899 if (difference > MAXIMUM_FLOW_TIME_DEVIATION) {
4900 /* assume upTime is set before record is composed and
4901 * that start-time has rolled over. */
4902 rwRecSetStartTime(
4903 fwd_rec, (nf9rec->t.sysup.systemInitTimeMilliseconds
4904 + nf9rec->t.sysup.flowStartSysUpTime
4905 + ROLLOVER32));
4906 rollover_first = ", assume flowStartSysUpTime rollover";
4907 } else if (-difference > MAXIMUM_FLOW_TIME_DEVIATION) {
4908 /* assume upTime is set after record is composed and
4909 * that upTime has rolled over. */
4910 rwRecSetStartTime(
4911 fwd_rec, (nf9rec->t.sysup.systemInitTimeMilliseconds
4912 + nf9rec->t.sysup.flowStartSysUpTime
4913 - ROLLOVER32));
4914 rollover_first = ", assume sysUpTime rollover";
4915 } else {
4916 /* times look reasonable; assume no roll over */
4917 rwRecSetStartTime(
4918 fwd_rec, (nf9rec->t.sysup.systemInitTimeMilliseconds
4919 + nf9rec->t.sysup.flowStartSysUpTime));
4920 rollover_first = "";
4921 }
4922 if (skpcProbeGetLogFlags(probe) & SOURCE_LOG_TIMESTAMPS) {
4923 sktimestamp_r(
4924 stime_buf, rwRecGetStartTime(fwd_rec), SKTIMESTAMP_UTC);
4925 INFOMSG(("'%s': Set sTime=%sZ, dur=%.3fs from incoming record"
4926 " flowStartSysUpTime=%" PRIu32
4927 ", flowEndSysUpTime=%" PRIu32
4928 ", systemInitTimeMilliseconds=%" PRIu64
4929 ", exportTimeSeconds=%" PRIu32
4930 ", calculated sysUpTime=%" PRIdMAX "%s%s"),
4931 skpcProbeGetName(probe),
4932 stime_buf, (double)rwRecGetElapsed(fwd_rec)/1000,
4933 nf9rec->t.sysup.flowStartSysUpTime,
4934 nf9rec->t.sysup.flowEndSysUpTime,
4935 nf9rec->t.sysup.systemInitTimeMilliseconds,
4936 fBufGetExportTime(fbuf), uptime,
4937 rollover_first, rollover_last);
4938 }
4939 }
4940
4941 /* SNMP or VLAN interfaces */
4942 if (SKPC_IFVALUE_SNMP == skpcProbeGetInterfaceValueType(probe)) {
4943 rwRecSetInput(fwd_rec, CLAMP_VAL16(nf9rec->ingressInterface));
4944 rwRecSetOutput(fwd_rec, CLAMP_VAL16(nf9rec->egressInterface));
4945 } else {
4946 rwRecSetInput(fwd_rec, nf9rec->vlanId);
4947 rwRecSetOutput(fwd_rec, nf9rec->postVlanId);
4948 }
4949
4950 /* Check for active timeout flag in the flowEndReason */
4951 if ((nf9rec->flowEndReason & SKI_END_MASK) == SKI_END_ACTIVE) {
4952 rwRecSetTcpState(fwd_rec, SK_TCPSTATE_TIMEOUT_KILLED);
4953 }
4954
4955 rwRecSetProto(fwd_rec, nf9rec->protocolIdentifier);
4956
4957 /* For TCP Flags, use whatever value was given in tcpControlBits,
4958 * regardless of protocol */
4959 rwRecSetFlags(fwd_rec, nf9rec->tcpControlBits);
4960 if (!rwRecIsICMP(fwd_rec)) {
4961 /* Use whatever values are in sport and dport, regardless of
4962 * protocol */
4963 rwRecSetSPort(fwd_rec, nf9rec->sourceTransportPort);
4964 rwRecSetDPort(fwd_rec, nf9rec->destinationTransportPort);
4965 if (rev_rec) {
4966 rwRecSetSPort(rev_rec, nf9rec->destinationTransportPort);
4967 rwRecSetDPort(rev_rec, nf9rec->sourceTransportPort);
4968 }
4969 } else {
4970 /* ICMP Record */
4971 /* Store ((icmpType << 8) | icmpCode) in the dPort if
4972 * available; else use the dport */
4973 rwRecSetSPort(fwd_rec, 0);
4974 if (record->bmap & TMPL_BIT_icmpTypeCodeIPv4) {
4975 rwRecSetDPort(fwd_rec, nf9rec->icmpTypeCode);
4976 } else if (record->bmap & TMPL_BIT_icmpTypeIPv4) {
4977 rwRecSetDPort(fwd_rec,((nf9rec->icmpType << 8)| nf9rec->icmpCode));
4978 } else {
4979 rwRecSetDPort(fwd_rec, nf9rec->destinationTransportPort);
4980 }
4981 if (rev_rec) {
4982 rwRecSetSPort(rev_rec, 0);
4983 rwRecSetDPort(rev_rec, rwRecGetDPort(fwd_rec));
4984 }
4985 }
4986
4987 if (rev_rec) {
4988 rwRecSetStartTime(rev_rec, rwRecGetStartTime(fwd_rec));
4989 rwRecSetElapsed(rev_rec, rwRecGetElapsed(fwd_rec));
4990 rwRecSetInput(rev_rec, rwRecGetOutput(fwd_rec));
4991 rwRecSetOutput(rev_rec, rwRecGetInput(fwd_rec));
4992 rwRecSetTcpState(rev_rec, rwRecGetTcpState(fwd_rec));
4993 rwRecSetProto(fwd_rec, nf9rec->protocolIdentifier);
4994 }
4995
4996 /* all done */
4997 return ((rev_rec) ? 2 : 1);
4998 }
4999
5000
5001 /*
5002 * Helper function for ipfix_reader().
5003 *
5004 * Handle the result of converting an IPFIX record to SiLK Flow
5005 * records on 'source': update statistics, store the reverse record
5006 * (if any) into the circular buffer, and move to the next location
5007 * in the circular buffer. The expected values for 'read_result'
5008 * are 0 (record ignored), 1 (uni-flow), and 2 (bi-flow).
5009 */
5010 static void
ipfix_reader_update_circbuf(skIPFIXSource_t * source,int read_result)5011 ipfix_reader_update_circbuf(
5012 skIPFIXSource_t *source,
5013 int read_result)
5014 {
5015 #if !SOURCE_LOG_MAX_PENDING_WRITE
5016 #define circbuf_count_addr NULL
5017 #else
5018 #define circbuf_count_addr &circbuf_count
5019 uint32_t circbuf_count;
5020 #endif
5021
5022 switch (read_result) {
5023 case 0:
5024 /* Ignore record */
5025 pthread_mutex_lock(&source->stats_mutex);
5026 ++source->ignored_flows;
5027 pthread_mutex_unlock(&source->stats_mutex);
5028 break;
5029
5030 case 1:
5031 /* We have filled the empty source->current_record slot.
5032 * Advance to the next record location. */
5033 if (skCircBufGetWriterBlock(
5034 source->circbuf, &source->current_record, circbuf_count_addr))
5035 {
5036 assert(source->stopped);
5037 break;
5038 }
5039 pthread_mutex_lock(&source->stats_mutex);
5040 ++source->forward_flows;
5041 #if SOURCE_LOG_MAX_PENDING_WRITE
5042 if (circbuf_count > source->max_pending) {
5043 source->max_pending = circbuf_count;
5044 }
5045 #endif
5046 pthread_mutex_unlock(&source->stats_mutex);
5047 break;
5048
5049 case 2:
5050 /* copy reverse record into the circular buf */
5051 if (skCircBufGetWriterBlock(
5052 source->circbuf, &source->current_record, NULL))
5053 {
5054 assert(source->stopped);
5055 break;
5056 }
5057 memcpy(source->current_record, &source->rvbuf, sizeof(source->rvbuf));
5058 if (skCircBufGetWriterBlock(
5059 source->circbuf, &source->current_record, circbuf_count_addr))
5060 {
5061 assert(source->stopped);
5062 break;
5063 }
5064 pthread_mutex_lock(&source->stats_mutex);
5065 ++source->forward_flows;
5066 ++source->reverse_flows;
5067 #if SOURCE_LOG_MAX_PENDING_WRITE
5068 if (circbuf_count > source->max_pending) {
5069 source->max_pending = circbuf_count;
5070 }
5071 #endif
5072 pthread_mutex_unlock(&source->stats_mutex);
5073 break;
5074
5075 default:
5076 skAbortBadCase(read_result);
5077 }
5078 }
5079
5080
5081 /*
5082 * THREAD ENTRY POINT
5083 *
5084 * The ipfix_reader() function is the main thread for listening to
5085 * data from a single fbListener_t object. It is passed the
5086 * skIPFIXSourceBase_t object containing that fbListener_t object.
5087 * This thread is started from the ipfixSourceCreateFromSockaddr()
5088 * function.
5089 */
5090 void *
ipfix_reader(void * vsource_base)5091 ipfix_reader(
5092 void *vsource_base)
5093 {
5094 #define IS_UDP (base->connspec->transport == FB_UDP)
5095 skIPFIXSourceBase_t *base = (skIPFIXSourceBase_t *)vsource_base;
5096 skIPFIXConnection_t *conn = NULL;
5097 skIPFIXSource_t *source = NULL;
5098 GError *err = NULL;
5099 fBuf_t *fbuf = NULL;
5100 int rv;
5101
5102 TRACE_ENTRY;
5103
5104 /* Ignore all signals */
5105 skthread_ignore_signals();
5106
5107 /* Communicate that the thread has started */
5108 pthread_mutex_lock(&base->mutex);
5109 pthread_cond_signal(&base->cond);
5110 base->started = 1;
5111 base->running = 1;
5112 DEBUGMSG("fixbuf listener started for [%s]:%s",
5113 base->connspec->host ? base->connspec->host : "*",
5114 base->connspec->svc);
5115 pthread_mutex_unlock(&base->mutex);
5116
5117 TRACEMSG(3, ("base %p started for [%s]:%s",
5118 base, base->connspec->host ? base->connspec->host : "*",
5119 base->connspec->svc));
5120
5121 /* Loop until destruction of the base object */
5122 while (!base->destroyed) {
5123
5124 /* wait for a new connection */
5125 fbuf = fbListenerWait(base->listener, &err);
5126 if (NULL == fbuf) {
5127 if (NULL == err) {
5128 /* got an unknown error---treat as fatal */
5129 NOTICEMSG("fixbuf listener shutting down:"
5130 " unknown error from fbListenerWait");
5131 break;
5132 }
5133
5134 if (g_error_matches(err,SK_IPFIXSOURCE_DOMAIN,SK_IPFIX_ERROR_CONN))
5135 {
5136 /* the callback rejected the connection (TCP only) */
5137 DEBUGMSG("fixbuf listener rejected connection: %s",
5138 err->message);
5139 g_clear_error(&err);
5140 continue;
5141 }
5142
5143 /* FB_ERROR_NLREAD indicates interrupted read, either
5144 * because the socket received EINTR or because
5145 * fbListenerInterrupt() was called.
5146 *
5147 * FB_ERROR_EOM indicates an end-of-message, and needs to
5148 * be ignored when running in manual mode. */
5149 if (g_error_matches(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD)
5150 || g_error_matches(err, FB_ERROR_DOMAIN, FB_ERROR_EOM))
5151 {
5152 TRACEMSG(1, (("fixbuf listener received %s"
5153 " while waiting for a connection: %s"),
5154 ((FB_ERROR_EOM == err->code)
5155 ? "end-of-message" : "interrupted read"),
5156 err->message));
5157 g_clear_error(&err);
5158 continue;
5159 }
5160
5161 /* treat any other error as fatal */
5162 NOTICEMSG(("fixbuf listener shutting down: %s"
5163 " (d=%" PRIu32 ",c=%" PRId32 ")"),
5164 err->message, (uint32_t)err->domain, (int32_t)err->code);
5165 g_clear_error(&err);
5166 break;
5167 }
5168
5169 /* Make sure the fbuf is in manual mode. Manual mode is
5170 * required to multiplex among multiple collectors using
5171 * fbListenerWait(). Without this, fBufNext() blocks once the
5172 * buffer is empty until it has messages again. Instead, we
5173 * want to switch to a different fbuf once we read all records
5174 * in the current buffer. */
5175 fBufSetAutomaticMode(fbuf, 0);
5176
5177 #if 0
5178 /* Added #if 0 since this should not be needed; the callback
5179 * is added to the session when the session is allocated. */
5180
5181 /* Invoke a callback when a new template arrives that tells
5182 * fixbuf how to map from the subTemplateMultiList used by YAF
5183 * for TCP information to our internal strucure. */
5184 skiAddSessionCallback(fBufGetSession(fbuf));
5185 #endif /* 0 */
5186
5187 /* Loop over fBufNext() until the buffer empties, we begin to
5188 * shutdown, or there is an error. All the ski_*_next()
5189 * functions call fBufNext() internally. */
5190 conn = NULL;
5191 while (!base->destroyed) {
5192 ski_rectype_t rectype;
5193 ski_record_t record;
5194
5195 /* Determine what type of record is next; this calls
5196 * fBufNextCollectionTemplate(), and gives error at end of
5197 * message */
5198 rectype = ski_rectype_next(fbuf, &record, &err);
5199
5200 if (!conn) {
5201 /* Get the connection data associated with this fBuf_t
5202 * object. In manual mode this loop processes a
5203 * single msg, which must have a single source. */
5204 conn = ((skIPFIXConnection_t *)
5205 fbCollectorGetContext(fBufGetCollector(fbuf)));
5206 if (conn == NULL) {
5207 /* If conn is NULL, we must have rejected a UDP
5208 * connection from the appInit function. */
5209 assert(rectype == SKI_RECTYPE_ERROR);
5210 TRACEMSG(2, ("<UNKNOWN>: %s", ski_rectype_name[rectype]));
5211 break;
5212 }
5213 source = conn->source;
5214 assert(source != NULL);
5215
5216 TRACEMSG(5, ("'%s': conn = %p, source = %p, fbuf = %p",
5217 source->name, conn, source, fbuf));
5218
5219 /* If this source is stopped, end the connection. If
5220 * source is told to stop while processing msg, the
5221 * circbuf will inform us. */
5222 if (source->stopped) {
5223 TRACEMSG(1, (("'%s': Closing connection since"
5224 " source is stopping"), source->name));
5225 if (!IS_UDP) {
5226 fBufFree(fbuf);
5227 fbuf = NULL;
5228 }
5229 if (rectype == SKI_RECTYPE_ERROR) {
5230 g_clear_error(&err);
5231 }
5232 break;
5233 }
5234 }
5235
5236 /* There is a 'break' statement after this switch(), so
5237 * any "normal" event (no error condition and buffer is
5238 * not empty) must use 'continue' after processing to
5239 * continue the loop. Any 'break' indicates an error. */
5240 switch (rectype) {
5241 case SKI_RECTYPE_ERROR:
5242 TRACEMSG(2, ("'%s': %s",
5243 source->name, ski_rectype_name[rectype]));
5244 break; /* error */
5245
5246 case SKI_RECTYPE_IGNORE:
5247 /* An unknown/ignored template */
5248 if (!ski_ignore_next(fbuf, &record, source->probe, &err)){
5249 /* should have been able to read something */
5250 TRACEMSG(2, ("'%s': %s and ski_ignore_next() is FALSE",
5251 source->name, ski_rectype_name[rectype]));
5252 break; /* error */
5253 }
5254 continue;
5255
5256 case SKI_RECTYPE_YAFSTATS:
5257 if (!ski_yafstats_next(fbuf, &record, source->probe, &err)){
5258 /* should have been able to read the stats */
5259 TRACEMSG(2, ("'%s': %s and ski_yafstats_next() is FALSE",
5260 source->name, ski_rectype_name[rectype]));
5261 break; /* error */
5262 }
5263 ski_yafstats_update_source(
5264 source, &record, &conn->prev_yafstats);
5265 continue;
5266
5267 case SKI_RECTYPE_TOMBSTONE:
5268 if (!ski_tombstone_next(fbuf, &record, source->probe, &err)){
5269 TRACEMSG(2, ("'%s': %s and ski_tombstone_next() is FALSE",
5270 source->name, ski_rectype_name[rectype]));
5271 break; /* error */
5272 }
5273 continue;
5274
5275 case SKI_RECTYPE_NF9SAMPLING:
5276 if (!ski_nf9sampling_next(fbuf, &record, source->probe, &err)){
5277 /* should have been able to read something */
5278 TRACEMSG(2, ("'%s': %s and ski_nf9sampling_next() is FALSE",
5279 source->name, ski_rectype_name[rectype]));
5280 break; /* error */
5281 }
5282 continue;
5283
5284 case SKI_RECTYPE_FIXREC:
5285 assert(source->current_record);
5286 record.fwd_rec = source->current_record;
5287 record.rev_rec = &source->rvbuf;
5288 rv = ski_fixrec_next(fbuf, &record, source->probe, &err);
5289 if (-1 == rv) {
5290 TRACEMSG(2, ("'%s': %s and ski_fixrec_next() returned -1",
5291 source->name, ski_rectype_name[rectype]));
5292 break; /* error */
5293 }
5294 ipfix_reader_update_circbuf(source, rv);
5295 continue;
5296
5297 case SKI_RECTYPE_YAFREC:
5298 assert(source->current_record);
5299 record.fwd_rec = source->current_record;
5300 record.rev_rec = &source->rvbuf;
5301 rv = ski_yafrec_next(fbuf, &record, source->probe, &err);
5302 if (-1 == rv) {
5303 TRACEMSG(2, ("'%s': %s and ski_yafrec_next() returned -1",
5304 source->name, ski_rectype_name[rectype]));
5305 break; /* error */
5306 }
5307 ipfix_reader_update_circbuf(source, rv);
5308 continue;
5309
5310 case SKI_RECTYPE_NF9REC:
5311 assert(source->current_record);
5312 record.fwd_rec = source->current_record;
5313 record.rev_rec = &source->rvbuf;
5314 rv = ski_nf9rec_next(fbuf, &record, source->probe, &err);
5315 if (-1 == rv) {
5316 TRACEMSG(2, ("'%s': %s and ski_nf9rec_next() returned -1",
5317 source->name, ski_rectype_name[rectype]));
5318 break; /* error */
5319 }
5320 ipfix_reader_update_circbuf(source, rv);
5321 continue;
5322 } /* switch (rectype) */
5323
5324 /* If we get here, stop reading from the current fbuf.
5325 * This may be because the fbuf is empty, because we are
5326 * shutting down, or due to an error. */
5327 break;
5328 }
5329 /* Finished with current IPFIX message, encountered an error
5330 * while processing message, or we are shutting down */
5331
5332 /* Handle FB_ERROR_NLREAD and FB_ERROR_EOM returned by
5333 * fBufNext() in the same way as when they are returned by
5334 * fbListenerWait().
5335 *
5336 * FB_ERROR_NLREAD is also returned when a previously rejected
5337 * UDP client attempts to send more data. */
5338 if (g_error_matches(err, FB_ERROR_DOMAIN, FB_ERROR_NLREAD)
5339 || g_error_matches(err, FB_ERROR_DOMAIN, FB_ERROR_EOM))
5340 {
5341 TRACEMSG(1, ("'%s': Ignoring %s: %s",
5342 (conn ? source->name : "<UNKNOWN>"),
5343 ((FB_ERROR_EOM == err->code)
5344 ? "end-of-message" : "interrupted read"),
5345 err->message));
5346 /* Do not free the fbuf here. The fbuf is owned by the
5347 * listener, and will be freed when the listener is freed.
5348 * Calling fBufFree() here would cause fixbuf to forget
5349 * the current template, which would cause it to ignore
5350 * records until a new template is transmitted. */
5351 g_clear_error(&err);
5352 continue;
5353 }
5354
5355 /* SK_IPFIX_ERROR_CONN indicates that a new UDP "connection"
5356 * was rejected by the appInit function in a multi-UDP
5357 * libfixbuf session. Do not free the fbuf since we do not
5358 * have a connection yet; wait for another connection. */
5359 if (g_error_matches(err, SK_IPFIXSOURCE_DOMAIN, SK_IPFIX_ERROR_CONN)) {
5360 assert(IS_UDP);
5361 INFOMSG("Closing connection: %s", err->message);
5362 g_clear_error(&err);
5363 continue;
5364 }
5365
5366 /* Handle shutdown events */
5367 if (base->destroyed) {
5368 break;
5369 }
5370
5371 /* Source has stopped, loop for the next source. */
5372 if (conn && source->stopped) {
5373 continue;
5374 }
5375
5376 /* The remainder of the code in this while() block assumes
5377 * that 'source' is valid, which is only true if 'conn' is
5378 * non-NULL. Trap that here, just in case. */
5379 if (NULL == conn) {
5380 if (NULL == err) {
5381 /* give up when error code is unknown */
5382 NOTICEMSG("'<UNKNOWN>': fixbuf listener shutting down:"
5383 " unknown error from fBufNext");
5384 break;
5385 }
5386 DEBUGMSG("Ignoring packet: %s (d=%" PRIu32 ",c=%" PRId32 ")",
5387 err->message, (uint32_t)err->domain, (int32_t)err->code);
5388 g_clear_error(&err);
5389 continue;
5390 }
5391
5392 /* FB_ERROR_NETFLOWV9 indicates an anomalous netflow v9
5393 * record; these do not disturb fixbuf state, and so should be
5394 * ignored. */
5395 if (g_error_matches(err, FB_ERROR_DOMAIN, FB_ERROR_NETFLOWV9)) {
5396 DEBUGMSG("'%s': Ignoring NetFlowV9 record: %s",
5397 source->name, err->message);
5398 g_clear_error(&err);
5399 continue;
5400 }
5401
5402 /* FB_ERROR_SFLOW indicates an anomalous sFlow
5403 * record; these do not disturb fixbuf state, and so should be
5404 * ignored. */
5405 if (g_error_matches(err, FB_ERROR_DOMAIN, FB_ERROR_SFLOW)) {
5406 DEBUGMSG("'%s': Ignoring sFlow record: %s",
5407 source->name, err->message);
5408 g_clear_error(&err);
5409 continue;
5410 }
5411
5412 /* FB_ERROR_TMPL indicates a set references a template ID for
5413 * which there is no template. Log and continue. */
5414 if (g_error_matches(err, FB_ERROR_DOMAIN, FB_ERROR_TMPL)) {
5415 DEBUGMSG("'%s': Ignoring data set: %s",
5416 source->name, err->message);
5417 g_clear_error(&err);
5418 continue;
5419 }
5420
5421 /* FB_ERROR_IPFIX indicates invalid IPFIX. We could simply
5422 * choose to log and continue; instead we choose to log, close
5423 * the connection, and continue. */
5424 if (g_error_matches(err, FB_ERROR_DOMAIN, FB_ERROR_IPFIX)) {
5425 if (IS_UDP) {
5426 DEBUGMSG("'%s': Ignoring invalid IPFIX: %s",
5427 source->name, err->message);
5428 } else {
5429 INFOMSG("'%s': Closing connection; received invalid IPFIX: %s",
5430 source->name, err->message);
5431 fBufFree(fbuf);
5432 fbuf = NULL;
5433 }
5434 g_clear_error(&err);
5435 continue;
5436 }
5437
5438 /* FB_ERROR_EOF indicates that the connection associated with
5439 * this fBuf_t object has finished. In this case, free the
5440 * fBuf_t object to close the connection. Do not free the
5441 * fBuf_t for UDP connections, since these UDP-based fBuf_t
5442 * objects are freed with the listener. */
5443 if (g_error_matches(err, FB_ERROR_DOMAIN, FB_ERROR_EOF)) {
5444 if (!IS_UDP) {
5445 INFOMSG("'%s': Closing connection: %s",
5446 source->name, err->message);
5447 fBufFree(fbuf);
5448 fbuf = NULL;
5449 }
5450 g_clear_error(&err);
5451 continue;
5452 }
5453
5454 /* Handle an unexpected error generated by fixbuf */
5455 if (err && err->domain == FB_ERROR_DOMAIN) {
5456 if (IS_UDP) {
5457 DEBUGMSG(("'%s': Ignoring UDP packet: %s"
5458 " (d=%" PRIu32 ",c=%" PRId32 ")"),
5459 source->name, err->message,
5460 (uint32_t)err->domain, (int32_t)err->code);
5461 } else {
5462 INFOMSG(("'%s': Closing connection: %s"
5463 " (d=%" PRIu32 ",c=%" PRId32 ")"),
5464 source->name, err->message,
5465 (uint32_t)err->domain, (int32_t)err->code);
5466 fBufFree(fbuf);
5467 fbuf = NULL;
5468 }
5469 g_clear_error(&err);
5470 continue;
5471 }
5472
5473 /* In the event of an unhandled error, end the thread. */
5474 if (NULL == err) {
5475 NOTICEMSG(("'%s': fixbuf listener shutting down:"
5476 " unknown error from fBufNext"),
5477 source->name);
5478 } else {
5479 NOTICEMSG(("'%s': fixbuf listener shutting down: %s"
5480 " (d=%" PRIu32 ",c=%" PRId32 ")"),
5481 source->name, err->message,
5482 (uint32_t)err->domain, (int32_t)err->code);
5483 g_clear_error(&err);
5484 }
5485 break;
5486 }
5487
5488 TRACEMSG(3, ("base %p exited while() loop", base));
5489
5490 /* Free the fbuf if it exists. (If it's UDP, it will be freed by
5491 * the destruction of the listener below.) */
5492 if (fbuf && !IS_UDP) {
5493 TRACEMSG(3, ("base %p calling fBufFree", base));
5494 fBufFree(fbuf);
5495 }
5496
5497 /* Note that the thread is ending, and wait for
5498 * skIPFIXSourceDestroy() to mark this as destroyed */
5499 DEBUGMSG("fixbuf listener ending for [%s]:%s...",
5500 base->connspec->host ? base->connspec->host : "*",
5501 base->connspec->svc);
5502 pthread_mutex_lock(&base->mutex);
5503 while (!base->destroyed) {
5504 pthread_cond_wait(&base->cond, &base->mutex);
5505 }
5506
5507 TRACEMSG(3, ("base %p is set to destroyed", base));
5508
5509 /* Destroy the fbListener_t object. This destroys the fbuf if the
5510 * stream is UDP. */
5511 ipfixSourceBaseFreeListener(base);
5512
5513 /* Notify skIPFIXSourceDestroy() that the thread is ending */
5514 base->running = 0;
5515 pthread_cond_signal(&base->cond);
5516 DEBUGMSG("fixbuf listener ended for [%s]:%s.",
5517 base->connspec->host ? base->connspec->host : "*",
5518 base->connspec->svc);
5519
5520 pthread_mutex_unlock(&base->mutex);
5521
5522 TRACE_RETURN(NULL);
5523 #undef IS_UDP
5524 }
5525
5526
5527 /*
5528 * Requests a record from the file-based IPFIX source 'source'.
5529 *
5530 * Returns 0 on success, -1 on failure.
5531 */
5532 int
ipfixSourceGetRecordFromFile(skIPFIXSource_t * source,rwRec * ipfix_rec)5533 ipfixSourceGetRecordFromFile(
5534 skIPFIXSource_t *source,
5535 rwRec *ipfix_rec)
5536 {
5537 ski_record_t record;
5538 GError *err = NULL;
5539 int rv;
5540
5541 TRACE_ENTRY;
5542
5543 /* Reading from a file */
5544 pthread_mutex_lock(&source->base->mutex);
5545 assert(source->readbuf);
5546
5547 if (source->reverse) {
5548 /* A reverse record exists from the previous flow */
5549 memcpy(ipfix_rec, &source->rvbuf, sizeof(*ipfix_rec));
5550 ++source->reverse_flows;
5551 source->reverse = 0;
5552 } else {
5553 /* Initialize the control variable for the do{}while() loop.
5554 * 0: ignore; 1: uniflow; 2: biflow; -1: error */
5555 rv = 0;
5556 do {
5557 /* Similar to the switch() block in ipfix_reader() above */
5558 switch (ski_rectype_next(source->readbuf, &record, &err)) {
5559 case SKI_RECTYPE_ERROR:
5560 rv = -1;
5561 break; /* error */
5562
5563 case SKI_RECTYPE_NF9SAMPLING:
5564 case SKI_RECTYPE_IGNORE:
5565 if (!ski_ignore_next(
5566 source->readbuf, &record, source->probe, &err))
5567 {
5568 /* should have been able to read something */
5569 TRACEMSG(2, ("'%s': %s and ski_ignore_next() is FALSE",
5570 source->name,
5571 ski_rectype_name[record.rectype]));
5572 rv = -1;
5573 break; /* error */
5574 }
5575 continue;
5576
5577 case SKI_RECTYPE_YAFSTATS:
5578 if (!ski_yafstats_next(
5579 source->readbuf, &record, source->probe, &err))
5580 {
5581 /* should have been able to read the stats */
5582 TRACEMSG(2, ("'%s': %s and ski_yafstats_next() is FALSE",
5583 source->name,
5584 ski_rectype_name[record.rectype]));
5585 rv = -1;
5586 break; /* error */
5587 }
5588 ski_yafstats_update_source(
5589 source, &record, &source->prev_yafstats);
5590 continue;
5591
5592 case SKI_RECTYPE_TOMBSTONE:
5593 if (!ski_tombstone_next(
5594 source->readbuf, &record, source->probe, &err))
5595 {
5596 TRACEMSG(2, ("'%s': %s and ski_tombstone_next() is FALSE",
5597 source->name,
5598 ski_rectype_name[record.rectype]));
5599 rv = -1;
5600 break; /* error */
5601 }
5602 continue;
5603
5604 case SKI_RECTYPE_FIXREC:
5605 record.fwd_rec = ipfix_rec;
5606 record.rev_rec = &source->rvbuf;
5607 rv = ski_fixrec_next(source->readbuf, &record,
5608 source->probe, &err);
5609 if (rv == 0) {
5610 ++source->ignored_flows;
5611 }
5612 break;
5613
5614 case SKI_RECTYPE_YAFREC:
5615 record.fwd_rec = ipfix_rec;
5616 record.rev_rec = &source->rvbuf;
5617 rv = ski_yafrec_next(source->readbuf, &record,
5618 source->probe, &err);
5619 if (rv == 0) {
5620 ++source->ignored_flows;
5621 }
5622 break;
5623
5624 case SKI_RECTYPE_NF9REC:
5625 record.fwd_rec = ipfix_rec;
5626 record.rev_rec = &source->rvbuf;
5627 rv = ski_nf9rec_next(source->readbuf, &record,
5628 source->probe, &err);
5629 if (rv == 0) {
5630 ++source->ignored_flows;
5631 }
5632 break;
5633
5634 default:
5635 skAppPrintErr("Unexpected record type");
5636 skAbort();
5637 }
5638 } while (rv == 0); /* Continue while current record is ignored */
5639
5640 if (rv == -1) {
5641 /* End of file or other problem */
5642 g_clear_error(&err);
5643 pthread_mutex_unlock(&source->base->mutex);
5644 TRACE_RETURN(-1);
5645 }
5646
5647 assert(rv == 1 || rv == 2);
5648 ++source->forward_flows;
5649
5650 /* We have the next flow. Set reverse if there is a
5651 * reverse record. */
5652 source->reverse = (rv == 2);
5653 }
5654
5655 pthread_mutex_unlock(&source->base->mutex);
5656
5657 TRACE_RETURN(0);
5658 }
5659
5660
5661
5662
5663 /*
5664 * The check-struct application calls the skiCheckDataStructure()
5665 * function, and that function requires access to the C structures
5666 * and templates that are local to this file.
5667 *
5668 * The check-struct.c file contains both the
5669 * skiCheckDataStructure() function and a main() function to use
5670 * for the application. The skiCheckDataStructure() is defined
5671 * when SKIPFIX_SOURCE is defined, otherwise the main() function is
5672 * defined.
5673 */
5674 #include "check-struct.c"
5675
5676
5677 /*
5678 ** Local Variables:
5679 ** mode:c
5680 ** indent-tabs-mode:nil
5681 ** c-basic-offset:4
5682 ** End:
5683 */
5684