1#!/bin/bash
2# added 2018-04-06 by richm, released under ASL 2.0
3#
4# Note: on buildbot VMs (where there is no environment cleanup), the
5# kubernetes test server may be kept running if the script aborts or
6# is aborted (buildbot master failure!) for some reason. As such we
7# execute it under "timeout" control, which ensure it always is
8# terminated. It's not a 100% great method, but hopefully does the
9# trick. -- rgerhards, 2018-07-21
10. ${srcdir:=.}/diag.sh init
11check_command_available timeout
12pwd=$( pwd )
13k8s_srv_port=$( get_free_port )
14generate_conf
15cachettl=10
16add_conf '
17global(workDirectory="'$RSYSLOG_DYNNAME.spool'")
18module(load="../plugins/impstats/.libs/impstats" interval="1"
19	   log.file="'"$RSYSLOG_DYNNAME.spool"'/mmkubernetes-stats.log" log.syslog="off" format="cee")
20module(load="../plugins/imfile/.libs/imfile")
21module(load="../plugins/mmjsonparse/.libs/mmjsonparse")
22module(load="../contrib/mmkubernetes/.libs/mmkubernetes")
23
24template(name="mmk8s_template" type="list") {
25    property(name="$!all-json-plain")
26    constant(value="\n")
27}
28
29input(type="imfile" file="'$RSYSLOG_DYNNAME.spool'/pod-*.log" tag="kubernetes" addmetadata="on")
30action(type="mmjsonparse" cookie="")
31action(type="mmkubernetes" token="dummy" kubernetesurl="http://localhost:'$k8s_srv_port'"
32       cacheexpireinterval="1" cacheentryttl="'$cachettl'"
33       filenamerules=["rule=:'$pwd/$RSYSLOG_DYNNAME.spool'/%pod_name:char-to:.%.%container_hash:char-to:_%_%namespace_name:char-to:_%_%container_name_and_id:char-to:.%.log",
34	                  "rule=:'$pwd/$RSYSLOG_DYNNAME.spool'/%pod_name:char-to:_%_%namespace_name:char-to:_%_%container_name_and_id:char-to:.%.log"]
35)
36action(type="omfile" file=`echo $RSYSLOG_OUT_LOG` template="mmk8s_template")
37'
38
39testsrv=mmk8s-test-server
40echo starting kubernetes \"emulator\"
41timeout 2m $PYTHON -u $srcdir/mmkubernetes_test_server.py $k8s_srv_port ${RSYSLOG_DYNNAME}${testsrv}.pid ${RSYSLOG_DYNNAME}${testsrv}.started > ${RSYSLOG_DYNNAME}.spool/mmk8s_srv.log 2>&1 &
42BGPROCESS=$!
43wait_process_startup ${RSYSLOG_DYNNAME}${testsrv} ${RSYSLOG_DYNNAME}${testsrv}.started
44echo background mmkubernetes_test_server.py process id is $BGPROCESS
45
46if [ "x${USE_VALGRIND:-NO}" == "xYES" ] ; then
47	export EXTRA_VALGRIND_SUPPRESSIONS="--suppressions=$srcdir/mmkubernetes.supp"
48	startup_vg
49else
50	startup
51    if [ -n "${USE_GDB:-}" ] ; then
52        echo attach gdb here
53        sleep 54321 || :
54    fi
55fi
56
57# add 3 logs - then wait $cachettl - the first log should prime the cache - the next two should be pulled from the cache
58cat > ${RSYSLOG_DYNNAME}.spool/pod-name.log <<EOF
59{"message":"msg1","CONTAINER_NAME":"some-prefix_container-name1_pod-name1_namespace-name1_unused1_unused11","CONTAINER_ID_FULL":"id1","testid":1}
60EOF
61sleep 2
62cat >> ${RSYSLOG_DYNNAME}.spool/pod-name.log <<EOF
63{"message":"msg2","CONTAINER_NAME":"some-prefix_container-name1_pod-name1_namespace-name1_unused1_unused11","CONTAINER_ID_FULL":"id1","testid":2}
64EOF
65sleep 2
66cat >> ${RSYSLOG_DYNNAME}.spool/pod-name.log <<EOF
67{"message":"msg3","CONTAINER_NAME":"some-prefix_container-name1_pod-name1_namespace-name1_unused1_unused11","CONTAINER_ID_FULL":"id1","testid":3}
68EOF
69sleep $cachettl
70# we should see
71# - namespacecachenumentries and podcachenumentries go from 0 to 1 then back to 0
72# - namespacecachehits and podcachehits go from 0 to 2
73# - namespacecachemisses and podcachemisses go from 0 to 1
74# add another record - should not be cached - should have expired
75cat >> ${RSYSLOG_DYNNAME}.spool/pod-name.log <<EOF
76{"message":"msg4","CONTAINER_NAME":"some-prefix_container-name1_pod-name1_namespace-name1_unused1_unused11","CONTAINER_ID_FULL":"id1","testid":4}
77EOF
78# we should see
79# - namespacecachenumentries and podcachenumentries back to 1
80# - namespacecachehits and podcachehits did not increase
81# - namespacecachemisses and podcachemisses increases
82
83# wait for the first batch of tests to complete
84wait_queueempty
85
86shutdown_when_empty
87if [ "x${USE_VALGRIND:-NO}" == "xYES" ] ; then
88	wait_shutdown_vg
89	check_exit_vg
90else
91	wait_shutdown
92fi
93kill $BGPROCESS
94wait_pid_termination ${RSYSLOG_DYNNAME}${testsrv}.pid
95
96rc=0
97# for each record in mmkubernetes-cache-expire.out.json, see if the matching
98# record is found in $RSYSLOG_OUT_LOG
99$PYTHON -c 'import sys,json
100k8s_srv_port = sys.argv[3]
101expected = {}
102for hsh in json.load(open(sys.argv[1])):
103	if "testid" in hsh:
104		if "kubernetes" in hsh and "master_url" in hsh["kubernetes"]:
105			hsh["kubernetes"]["master_url"] = hsh["kubernetes"]["master_url"].format(k8s_srv_port=k8s_srv_port)
106		expected[hsh["testid"]] = hsh
107rc = 0
108actual = {}
109for line in open(sys.argv[2]):
110	hsh = json.loads(line)
111	if "testid" in hsh:
112		actual[hsh["testid"]] = hsh
113for testid,hsh in expected.items():
114	if not testid in actual:
115		print("Error: record for testid {0} not found in output".format(testid))
116		rc = 1
117	else:
118		for kk,vv in hsh.items():
119			if not kk in actual[testid]:
120				print("Error: key {0} in record for testid {1} not found in output".format(kk, testid))
121				rc = 1
122			elif not vv == actual[testid][kk]:
123				print("Error: value {0} for key {1} in record for testid {2} does not match the expected value {3}".format(str(actual[testid][kk]), kk, testid, str(vv)))
124				rc = 1
125sys.exit(rc)
126' $srcdir/mmkubernetes-cache-expire.out.expected $RSYSLOG_OUT_LOG $k8s_srv_port || rc=$?
127
128if [ -f ${RSYSLOG_DYNNAME}.spool/mmkubernetes-stats.log ] ; then
129	$PYTHON <${RSYSLOG_DYNNAME}.spool/mmkubernetes-stats.log  -c '
130import sys,json
131# key is recordseen, value is hash of stats for that record
132expectedvalues = {
133	1: {"namespacecachenumentries": 1, "podcachenumentries": 1, "namespacecachehits": 0,
134	    "podcachehits": 0, "namespacecachemisses": 1, "podcachemisses": 1},
135	2: {"namespacecachenumentries": 1, "podcachenumentries": 1, "namespacecachehits": 0,
136	    "podcachehits": 1, "namespacecachemisses": 1, "podcachemisses": 1},
137	3: {"namespacecachenumentries": 1, "podcachenumentries": 1, "namespacecachehits": 0,
138	    "podcachehits": 2, "namespacecachemisses": 1, "podcachemisses": 1},
139    4: {"namespacecachenumentries": 1, "podcachenumentries": 1, "namespacecachehits": 0,
140	   "podcachehits": 2, "namespacecachemisses": 2, "podcachemisses": 2},
141}
142for line in sys.stdin:
143	jstart = line.find("{")
144	if jstart >= 0:
145		hsh = json.loads(line[jstart:])
146		if hsh.get("origin") == "mmkubernetes" and hsh["recordseen"] in expectedvalues:
147			expected = expectedvalues[hsh["recordseen"]]
148			for key in expected:
149				if not expected[key] == hsh.get(key):
150					print("Error: expected value [%s] not equal to actual value [%s] in record [%d] for stat [%s]".format(
151					str(expected[key]), str(hsh[key]), hsh["recordseen"], key))
152' || { rc=$?; echo error: expected stats not found in ${RSYSLOG_DYNNAME}.spool/mmkubernetes-stats.log; }
153else
154	echo error: stats file ${RSYSLOG_DYNNAME}.spool/mmkubernetes-stats.log not found
155	rc=1
156fi
157
158if [ ${rc:-0} -ne 0 ]; then
159	echo
160	echo "FAIL: expected data not found.  $RSYSLOG_OUT_LOG is:"
161	cat ${RSYSLOG_DYNNAME}.spool/mmk8s_srv.log
162	cat $RSYSLOG_OUT_LOG
163	cat ${RSYSLOG_DYNNAME}.spool/mmkubernetes-stats.log
164	error_exit 1
165fi
166
167exit_test
168