1""" 2======================================================================== 3src_sink_test 4======================================================================== 5Tests for test sources and test sinks. 6 7Author : Yanghui Ou 8 Date : Mar 11, 2019 9""" 10import pytest 11 12from pymtl3 import * 13 14from ..test_helpers import run_sim 15from ..test_sinks import PyMTLTestSinkError, TestSinkCL, TestSinkRTL 16from ..test_srcs import TestSrcCL, TestSrcRTL 17 18#------------------------------------------------------------------------- 19# TestHarnessSimple 20#------------------------------------------------------------------------- 21# Test a single pair of test src/sink. 22 23class TestHarnessSimple( Component ): 24 25 def construct( s, MsgType, SrcType, SinkType, src_msgs, sink_msgs ): 26 27 s.src = SrcType ( MsgType, src_msgs ) 28 s.sink = SinkType( MsgType, sink_msgs ) 29 30 connect( s.src.send, s.sink.recv ) 31 32 def done( s ): 33 return s.src.done() and s.sink.done() 34 35 def line_trace( s ): 36 return "{} > {}".format( s.src.line_trace(), s.sink.line_trace() ) 37 38#------------------------------------------------------------------------- 39# Test cases 40#------------------------------------------------------------------------- 41 42def test_cl_no_delay(): 43 msgs = [ Bits16( 0 ), Bits16( 1 ), Bits16( 2 ), Bits16( 3 ) ] 44 th = TestHarnessSimple( Bits16, TestSrcCL, TestSinkCL, msgs, msgs ) 45 run_sim( th ) 46 47# int_msgs = [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ] 48bit_msgs = [ Bits16( 0 ), Bits16( 1 ), Bits16( 2 ), Bits16( 3 ), 49 Bits16( 0 ), Bits16( 1 ), Bits16( 2 ), Bits16( 3 ), 50 Bits16( 0 ), Bits16( 1 ), Bits16( 2 ), Bits16( 3 ) ] 51 52arrival0 = [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 ] 53arrival1 = [ 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21 ] 54arrival2 = [ 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33 ] 55arrival3 = [ 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33 ] 56arrival4 = [ 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60 ] 57 58@pytest.mark.parametrize( 59 ('Type', 'msgs', 'src_init', 'src_intv', 60 'sink_init', 'sink_intv', 'arrival_time' ), 61 [ 62 ( Bits16, bit_msgs, 0, 0, 0, 0, arrival0 ), 63 # ( int, int_msgs, 10, 0, 0, 0, arrival1 ), 64 ( Bits16, bit_msgs, 10, 1, 0, 0, arrival2 ), 65 ( Bits16, bit_msgs, 10, 0, 0, 1, arrival3 ), 66 ( Bits16, bit_msgs, 3, 4, 5, 3, arrival4 ) 67 ] 68) 69def test_src_sink_cl( Type, msgs, src_init, src_intv, 70 sink_init, sink_intv, arrival_time ): 71 th = TestHarnessSimple( Type, TestSrcCL, TestSinkCL, msgs, msgs ) 72 th.set_param( "top.src.construct", 73 initial_delay = src_init, 74 interval_delay = src_intv, 75 ) 76 th.set_param( "top.sink.construct", 77 initial_delay = sink_init, 78 interval_delay = sink_intv, 79 arrival_time = arrival_time, 80 ) 81 run_sim( th ) 82 83@pytest.mark.parametrize( 84 ('Type', 'msgs', 'src_init', 'src_intv', 85 'sink_init', 'sink_intv', 'arrival_time' ), 86 [ 87 ( Bits16, bit_msgs, 0, 0, 0, 0, arrival0 ), 88 # ( int, int_msgs, 10, 0, 0, 0, arrival1 ), 89 ( Bits16, bit_msgs, 10, 1, 0, 0, arrival2 ), 90 ( Bits16, bit_msgs, 10, 0, 0, 1, arrival3 ), 91 ( Bits16, bit_msgs, 3, 4, 5, 3, arrival4 ) 92 ] 93) 94def test_src_sink_rtl( Type, msgs, src_init, src_intv, 95 sink_init, sink_intv, arrival_time ): 96 th = TestHarnessSimple( Type, TestSrcRTL, TestSinkRTL, msgs, msgs ) 97 th.set_param( "top.src.construct", 98 initial_delay = src_init, 99 interval_delay = src_intv, 100 ) 101 th.set_param( "top.sink.construct", 102 initial_delay = sink_init, 103 interval_delay = sink_intv, 104 arrival_time = arrival_time, 105 ) 106 run_sim( th ) 107 108#------------------------------------------------------------------------- 109# Adaptive composition test 110#------------------------------------------------------------------------- 111# This test attempts to mix-and-match different levels of test srcs and 112# sinks for all possible combinations -- cl/cl, rtl/cl, cl/rtl, rtl/rtl. 113# It also creates multiple src/sink pairs to stress the management of 114# multiple instances of the same adapter class 115 116class TestHarness( Component ): 117 118 def construct( s, src_level, sink_level, MsgType, src_msgs, sink_msgs, 119 src_init, src_intv, 120 sink_init, sink_interval, arrival_time=None ): 121 s.num_pairs = 2 122 123 if src_level == 'cl': 124 s.srcs = [ TestSrcCL ( MsgType, src_msgs, src_init, src_intv ) 125 for i in range(s.num_pairs) ] 126 elif src_level == 'rtl': 127 s.srcs = [ TestSrcRTL( MsgType, src_msgs, src_init, src_intv ) 128 for i in range(s.num_pairs) ] 129 else: 130 raise 131 132 if sink_level == 'cl': 133 s.sinks = [ TestSinkCL( MsgType, sink_msgs, sink_init, sink_interval, arrival_time ) 134 for i in range(s.num_pairs) ] 135 elif sink_level == 'rtl': 136 s.sinks = [ TestSinkRTL( MsgType, sink_msgs, sink_init, sink_interval, arrival_time ) 137 for i in range(s.num_pairs) ] 138 else: 139 raise 140 141 # Connections 142 for i in range(s.num_pairs): 143 connect( s.srcs[i].send, s.sinks[i].recv ) 144 145 def done( s ): 146 for i in range(s.num_pairs): 147 if not s.srcs[i].done() or not s.sinks[i].done(): 148 return False 149 return True 150 151 def line_trace( s ): 152 return "{} >>> {}".format( "|".join( [ x.line_trace() for x in s.srcs ] ), 153 "|".join( [ x.line_trace() for x in s.sinks ] ) ) 154 155test_case_table = [] 156for src in ['cl', 'rtl']: 157 for sink in ['cl', 'rtl']: 158 test_case_table += [ 159 ( src, sink, bit_msgs, 0, 0, 0, 0, arrival0 ), 160 # ( src, sink, int_msgs, 10, 0, 0, 0, arrival1 ), 161 ( src, sink, bit_msgs, 10, 1, 0, 0, arrival2 ), 162 ( src, sink, bit_msgs, 10, 0, 0, 1, arrival3 ), 163 ( src, sink, bit_msgs, 3, 4, 5, 3, arrival4 ), 164 ] 165 166@pytest.mark.parametrize( 167 ('src_level', 'sink_level', 'msgs', 168 'src_init', 'src_intv', 'sink_init', 'sink_intv', 'arrival_time' ), 169 test_case_table, 170) 171def test_adaptive( src_level, sink_level, msgs, src_init, src_intv, 172 sink_init, sink_intv, arrival_time ): 173 th = TestHarness( src_level, sink_level, Bits16, msgs, msgs, 174 src_init, src_intv, sink_init, 175 sink_intv, arrival_time ) 176 run_sim( th ) 177 178#------------------------------------------------------------------------- 179# Error message test 180#------------------------------------------------------------------------- 181 182def test_error_more_msg(): 183 try: 184 th = TestHarnessSimple( 185 Bits16, TestSrcCL, TestSinkCL, 186 src_msgs = [ b16(0xface), b16(0xface) ], 187 sink_msgs = [ b16(0xface) ], 188 ) 189 run_sim( th ) 190 except PyMTLTestSinkError as e: 191 return 192 raise Exception( 'Failed to detect error!' ) 193 194def test_error_wrong_msg(): 195 try: 196 th = TestHarnessSimple( 197 Bits16, TestSrcCL, TestSinkCL, 198 src_msgs = [ b16(0xface), b16(0xface) ], 199 sink_msgs = [ b16(0xface), b16(0xdead) ], 200 ) 201 run_sim( th ) 202 except PyMTLTestSinkError as e: 203 return 204 raise Exception( 'Fail to detect error!' ) 205 206def test_error_late_msg(): 207 try: 208 th = TestHarnessSimple( 209 Bits16, TestSrcCL, TestSinkCL, 210 src_msgs = [ b16(0xface), b16(0xface) ], 211 sink_msgs = [ b16(0xface), b16(0xdead) ], 212 ) 213 th.set_param( 'top.src.construct', initial_delay=5 ) 214 th.set_param( 'top.sink.construct', arrival_time=[1,2] ) 215 run_sim( th ) 216 except PyMTLTestSinkError as e: 217 return 218 raise Exception( 'Fail to detect error!') 219 220#------------------------------------------------------------------------- 221# Customized compare function test 222#------------------------------------------------------------------------- 223 224def test_customized_cmp(): 225 th = TestHarnessSimple( 226 Bits4, TestSrcCL, TestSinkCL, 227 src_msgs = [ b4(0b1110), b4(0b1111) ], 228 sink_msgs = [ b4(0b0010), b4(0b0011) ], 229 ) 230 th.set_param( 'top.sink.construct', cmp_fn=lambda a, b: a[0:2] == b[0:2] ) 231 run_sim( th ) 232