1"""
2========================================================================
3src_sink_test
4========================================================================
5Tests for test sources and test sinks.
6
7Author : Yanghui Ou
8  Date : Mar 11, 2019
9"""
10import pytest
11
12from pymtl3 import *
13
14from ..test_helpers import run_sim
15from ..test_sinks import PyMTLTestSinkError, TestSinkCL, TestSinkRTL
16from ..test_srcs import TestSrcCL, TestSrcRTL
17
18#-------------------------------------------------------------------------
19# TestHarnessSimple
20#-------------------------------------------------------------------------
21# Test a single pair of test src/sink.
22
23class TestHarnessSimple( Component ):
24
25  def construct( s, MsgType, SrcType, SinkType, src_msgs, sink_msgs ):
26
27    s.src  = SrcType ( MsgType, src_msgs  )
28    s.sink = SinkType( MsgType, sink_msgs )
29
30    connect( s.src.send, s.sink.recv  )
31
32  def done( s ):
33    return s.src.done() and s.sink.done()
34
35  def line_trace( s ):
36    return "{} > {}".format( s.src.line_trace(), s.sink.line_trace() )
37
38#-------------------------------------------------------------------------
39# Test cases
40#-------------------------------------------------------------------------
41
42def test_cl_no_delay():
43  msgs  = [ Bits16( 0 ), Bits16( 1 ), Bits16( 2 ), Bits16( 3 ) ]
44  th = TestHarnessSimple( Bits16, TestSrcCL, TestSinkCL, msgs, msgs )
45  run_sim( th )
46
47# int_msgs = [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ]
48bit_msgs = [ Bits16( 0 ), Bits16( 1 ), Bits16( 2 ), Bits16( 3 ),
49             Bits16( 0 ), Bits16( 1 ), Bits16( 2 ), Bits16( 3 ),
50             Bits16( 0 ), Bits16( 1 ), Bits16( 2 ), Bits16( 3 ) ]
51
52arrival0 = [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 ]
53arrival1 = [ 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21 ]
54arrival2 = [ 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33 ]
55arrival3 = [ 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33 ]
56arrival4 = [ 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60 ]
57
58@pytest.mark.parametrize(
59  ('Type', 'msgs', 'src_init',  'src_intv',
60   'sink_init', 'sink_intv', 'arrival_time' ),
61  [
62    ( Bits16, bit_msgs,  0,  0, 0, 0, arrival0 ),
63    # ( int,    int_msgs, 10,  0, 0, 0, arrival1 ),
64    ( Bits16, bit_msgs, 10,  1, 0, 0, arrival2 ),
65    ( Bits16, bit_msgs, 10,  0, 0, 1, arrival3 ),
66    ( Bits16, bit_msgs,  3,  4, 5, 3, arrival4 )
67  ]
68)
69def test_src_sink_cl( Type, msgs, src_init,  src_intv,
70                      sink_init, sink_intv, arrival_time ):
71  th = TestHarnessSimple( Type, TestSrcCL, TestSinkCL, msgs, msgs )
72  th.set_param( "top.src.construct",
73    initial_delay  = src_init,
74    interval_delay = src_intv,
75  )
76  th.set_param( "top.sink.construct",
77    initial_delay  = sink_init,
78    interval_delay = sink_intv,
79    arrival_time   = arrival_time,
80  )
81  run_sim( th )
82
83@pytest.mark.parametrize(
84  ('Type', 'msgs', 'src_init',  'src_intv',
85   'sink_init', 'sink_intv', 'arrival_time' ),
86  [
87    ( Bits16, bit_msgs,  0,  0, 0, 0, arrival0 ),
88    # ( int,    int_msgs, 10,  0, 0, 0, arrival1 ),
89    ( Bits16, bit_msgs, 10,  1, 0, 0, arrival2 ),
90    ( Bits16, bit_msgs, 10,  0, 0, 1, arrival3 ),
91    ( Bits16, bit_msgs,  3,  4, 5, 3, arrival4 )
92  ]
93)
94def test_src_sink_rtl( Type, msgs, src_init,  src_intv,
95                       sink_init, sink_intv, arrival_time ):
96  th = TestHarnessSimple( Type, TestSrcRTL, TestSinkRTL, msgs, msgs )
97  th.set_param( "top.src.construct",
98    initial_delay  = src_init,
99    interval_delay = src_intv,
100  )
101  th.set_param( "top.sink.construct",
102    initial_delay  = sink_init,
103    interval_delay = sink_intv,
104    arrival_time   = arrival_time,
105  )
106  run_sim( th )
107
108#-------------------------------------------------------------------------
109# Adaptive composition test
110#-------------------------------------------------------------------------
111# This test attempts to mix-and-match different levels of test srcs and
112# sinks for all possible combinations -- cl/cl, rtl/cl, cl/rtl, rtl/rtl.
113# It also creates multiple src/sink pairs to stress the management of
114# multiple instances of the same adapter class
115
116class TestHarness( Component ):
117
118  def construct( s, src_level, sink_level, MsgType, src_msgs, sink_msgs,
119                 src_init, src_intv,
120                 sink_init, sink_interval, arrival_time=None ):
121    s.num_pairs = 2
122
123    if src_level == 'cl':
124      s.srcs = [ TestSrcCL ( MsgType, src_msgs, src_init, src_intv )
125                  for i in range(s.num_pairs) ]
126    elif src_level == 'rtl':
127      s.srcs = [ TestSrcRTL( MsgType, src_msgs, src_init, src_intv )
128                  for i in range(s.num_pairs) ]
129    else:
130      raise
131
132    if sink_level == 'cl':
133      s.sinks = [ TestSinkCL( MsgType, sink_msgs, sink_init, sink_interval, arrival_time )
134                  for i in range(s.num_pairs) ]
135    elif sink_level == 'rtl':
136      s.sinks = [ TestSinkRTL( MsgType, sink_msgs, sink_init, sink_interval, arrival_time )
137                  for i in range(s.num_pairs) ]
138    else:
139      raise
140
141    # Connections
142    for i in range(s.num_pairs):
143      connect( s.srcs[i].send, s.sinks[i].recv )
144
145  def done( s ):
146    for i in range(s.num_pairs):
147      if not s.srcs[i].done() or not s.sinks[i].done():
148        return False
149    return True
150
151  def line_trace( s ):
152    return "{} >>> {}".format( "|".join( [ x.line_trace() for x in s.srcs ] ),
153                               "|".join( [ x.line_trace() for x in s.sinks ] ) )
154
155test_case_table = []
156for src in ['cl', 'rtl']:
157  for sink in ['cl', 'rtl']:
158    test_case_table += [
159      ( src, sink, bit_msgs,  0,  0, 0, 0, arrival0 ),
160      # ( src, sink, int_msgs, 10,  0, 0, 0, arrival1 ),
161      ( src, sink, bit_msgs, 10,  1, 0, 0, arrival2 ),
162      ( src, sink, bit_msgs, 10,  0, 0, 1, arrival3 ),
163      ( src, sink, bit_msgs,  3,  4, 5, 3, arrival4 ),
164    ]
165
166@pytest.mark.parametrize(
167  ('src_level', 'sink_level', 'msgs',
168   'src_init',  'src_intv', 'sink_init', 'sink_intv', 'arrival_time' ),
169  test_case_table,
170)
171def test_adaptive( src_level, sink_level, msgs, src_init,  src_intv,
172                        sink_init, sink_intv, arrival_time ):
173  th = TestHarness( src_level, sink_level, Bits16, msgs, msgs,
174                    src_init,  src_intv, sink_init,
175                    sink_intv, arrival_time )
176  run_sim( th )
177
178#-------------------------------------------------------------------------
179# Error message test
180#-------------------------------------------------------------------------
181
182def test_error_more_msg():
183  try:
184    th = TestHarnessSimple(
185      Bits16, TestSrcCL, TestSinkCL,
186      src_msgs  = [ b16(0xface), b16(0xface) ],
187      sink_msgs = [ b16(0xface) ],
188    )
189    run_sim( th )
190  except PyMTLTestSinkError as e:
191    return
192  raise Exception( 'Failed to detect error!' )
193
194def test_error_wrong_msg():
195  try:
196    th = TestHarnessSimple(
197      Bits16, TestSrcCL, TestSinkCL,
198      src_msgs  = [ b16(0xface), b16(0xface) ],
199      sink_msgs = [ b16(0xface), b16(0xdead) ],
200    )
201    run_sim( th )
202  except PyMTLTestSinkError as e:
203    return
204  raise Exception( 'Fail to detect error!' )
205
206def test_error_late_msg():
207  try:
208    th = TestHarnessSimple(
209      Bits16, TestSrcCL, TestSinkCL,
210      src_msgs  = [ b16(0xface), b16(0xface) ],
211      sink_msgs = [ b16(0xface), b16(0xdead) ],
212    )
213    th.set_param( 'top.src.construct', initial_delay=5 )
214    th.set_param( 'top.sink.construct', arrival_time=[1,2] )
215    run_sim( th )
216  except PyMTLTestSinkError as e:
217    return
218  raise Exception( 'Fail to detect error!')
219
220#-------------------------------------------------------------------------
221# Customized compare function test
222#-------------------------------------------------------------------------
223
224def test_customized_cmp():
225  th = TestHarnessSimple(
226    Bits4, TestSrcCL, TestSinkCL,
227    src_msgs  = [ b4(0b1110), b4(0b1111) ],
228    sink_msgs = [ b4(0b0010), b4(0b0011) ],
229  )
230  th.set_param( 'top.sink.construct', cmp_fn=lambda a, b: a[0:2] == b[0:2] )
231  run_sim( th )
232