1# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#    http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16# vim: tabstop=4 shiftwidth=4 softtabstop=4
17
18import unittest
19import logging
20import struct
21
22from nose.tools import eq_
23from ryu.lib.packet import bpdu
24
25
26LOG = logging.getLogger(__name__)
27
28
29class Test_ConfigurationBPDUs(unittest.TestCase):
30    """ Test case for ConfigurationBPDUs
31    """
32
33    def setUp(self):
34        self.protocol_id = bpdu.PROTOCOL_IDENTIFIER
35        self.version_id = bpdu.ConfigurationBPDUs.VERSION_ID
36        self.bpdu_type = bpdu.ConfigurationBPDUs.BPDU_TYPE
37        self.flags = 0b00000001
38        self.root_priority = 4096
39        self.root_system_id_extension = 1
40        self.root_mac_address = '12:34:56:78:9a:bc'
41        self.root_path_cost = 2
42        self.bridge_priority = 8192
43        self.bridge_system_id_extension = 3
44        self.bridge_mac_address = 'aa:aa:aa:aa:aa:aa'
45        self.port_priority = 16
46        self.port_number = 4
47        self.message_age = 5
48        self.max_age = 6
49        self.hello_time = 7
50        self.forward_delay = 8
51
52        self.msg = bpdu.ConfigurationBPDUs(
53            flags=self.flags,
54            root_priority=self.root_priority,
55            root_system_id_extension=self.root_system_id_extension,
56            root_mac_address=self.root_mac_address,
57            root_path_cost=self.root_path_cost,
58            bridge_priority=self.bridge_priority,
59            bridge_system_id_extension=self.bridge_system_id_extension,
60            bridge_mac_address=self.bridge_mac_address,
61            port_priority=self.port_priority,
62            port_number=self.port_number,
63            message_age=self.message_age,
64            max_age=self.max_age,
65            hello_time=self.hello_time,
66            forward_delay=self.forward_delay)
67
68        self.fmt = (bpdu.bpdu._PACK_STR
69                    + bpdu.ConfigurationBPDUs._PACK_STR[1:])
70        self.buf = struct.pack(self.fmt,
71                               self.protocol_id, self.version_id,
72                               self.bpdu_type, self.flags,
73                               bpdu.ConfigurationBPDUs.encode_bridge_id(
74                                   self.root_priority,
75                                   self.root_system_id_extension,
76                                   self.root_mac_address),
77                               self.root_path_cost,
78                               bpdu.ConfigurationBPDUs.encode_bridge_id(
79                                   self.bridge_priority,
80                                   self.bridge_system_id_extension,
81                                   self.bridge_mac_address),
82                               bpdu.ConfigurationBPDUs.encode_port_id(
83                                   self.port_priority,
84                                   self.port_number),
85                               bpdu.ConfigurationBPDUs._encode_timer(
86                                   self.message_age),
87                               bpdu.ConfigurationBPDUs._encode_timer(
88                                   self.max_age),
89                               bpdu.ConfigurationBPDUs._encode_timer(
90                                   self.hello_time),
91                               bpdu.ConfigurationBPDUs._encode_timer(
92                                   self.forward_delay))
93
94    def test_init(self):
95        eq_(self.protocol_id, self.msg._protocol_id)
96        eq_(self.version_id, self.msg._version_id)
97        eq_(self.bpdu_type, self.msg._bpdu_type)
98        eq_(self.flags, self.msg.flags)
99        eq_(self.root_priority, self.msg.root_priority)
100        eq_(self.root_system_id_extension,
101            self.msg.root_system_id_extension)
102        eq_(self.root_mac_address, self.msg.root_mac_address)
103        eq_(self.root_path_cost, self.msg.root_path_cost)
104        eq_(self.bridge_priority, self.msg.bridge_priority)
105        eq_(self.bridge_system_id_extension,
106            self.msg.bridge_system_id_extension)
107        eq_(self.bridge_mac_address, self.msg.bridge_mac_address)
108        eq_(self.port_priority, self.msg.port_priority)
109        eq_(self.port_number, self.msg.port_number)
110        eq_(self.message_age, self.msg.message_age)
111        eq_(self.max_age, self.msg.max_age)
112        eq_(self.hello_time, self.msg.hello_time)
113        eq_(self.forward_delay, self.msg.forward_delay)
114
115    def test_parser(self):
116        r1, r2, _ = bpdu.bpdu.parser(self.buf)
117
118        eq_(type(r1), type(self.msg))
119        eq_(r1._protocol_id, self.protocol_id)
120        eq_(r1._version_id, self.version_id)
121        eq_(r1._bpdu_type, self.bpdu_type)
122        eq_(r1.flags, self.flags)
123        eq_(r1.root_priority, self.root_priority)
124        eq_(r1.root_system_id_extension, self.root_system_id_extension)
125        eq_(r1.root_mac_address, self.root_mac_address)
126        eq_(r1.root_path_cost, self.root_path_cost)
127        eq_(r1.bridge_priority, self.bridge_priority)
128        eq_(r1.bridge_system_id_extension, self.bridge_system_id_extension)
129        eq_(r1.bridge_mac_address, self.bridge_mac_address)
130        eq_(r1.port_priority, self.port_priority)
131        eq_(r1.port_number, self.port_number)
132        eq_(r1.message_age, self.message_age)
133        eq_(r1.max_age, self.max_age)
134        eq_(r1.hello_time, self.hello_time)
135        eq_(r1.forward_delay, self.forward_delay)
136        eq_(r2, None)
137
138    def test_serialize(self):
139        data = bytearray()
140        prev = None
141        buf = self.msg.serialize(data, prev)
142        res = struct.unpack(self.fmt, buf)
143
144        eq_(res[0], self.protocol_id)
145        eq_(res[1], self.version_id)
146        eq_(res[2], self.bpdu_type)
147        eq_(res[3], self.flags)
148        eq_(bpdu.ConfigurationBPDUs._decode_bridge_id(res[4]),
149            (self.root_priority,
150             self.root_system_id_extension,
151             self.root_mac_address))
152        eq_(res[5], self.root_path_cost)
153        eq_(bpdu.ConfigurationBPDUs._decode_bridge_id(res[6]),
154            (self.bridge_priority,
155             self.bridge_system_id_extension,
156             self.bridge_mac_address))
157        eq_(bpdu.ConfigurationBPDUs._decode_port_id(res[7]),
158            (self.port_priority,
159             self.port_number))
160        eq_(bpdu.ConfigurationBPDUs._decode_timer(res[8]), self.message_age)
161        eq_(bpdu.ConfigurationBPDUs._decode_timer(res[9]), self.max_age)
162        eq_(bpdu.ConfigurationBPDUs._decode_timer(res[10]), self.hello_time)
163        eq_(bpdu.ConfigurationBPDUs._decode_timer(res[11]), self.forward_delay)
164
165    def test_json(self):
166        jsondict = self.msg.to_jsondict()
167        msg = bpdu.ConfigurationBPDUs.from_jsondict(
168            jsondict['ConfigurationBPDUs'])
169        eq_(str(self.msg), str(msg))
170
171
172class Test_TopologyChangeNotificationBPDUs(unittest.TestCase):
173    """ Test case for TopologyChangeNotificationBPDUs
174    """
175
176    def setUp(self):
177        self.protocol_id = bpdu.PROTOCOL_IDENTIFIER
178        self.version_id = bpdu.TopologyChangeNotificationBPDUs.VERSION_ID
179        self.bpdu_type = bpdu.TopologyChangeNotificationBPDUs.BPDU_TYPE
180
181        self.msg = bpdu.TopologyChangeNotificationBPDUs()
182
183        self.fmt = bpdu.bpdu._PACK_STR
184        self.buf = struct.pack(self.fmt,
185                               self.protocol_id,
186                               self.version_id,
187                               self.bpdu_type)
188
189    def test_init(self):
190        eq_(self.protocol_id, self.msg._protocol_id)
191        eq_(self.version_id, self.msg._version_id)
192        eq_(self.bpdu_type, self.msg._bpdu_type)
193
194    def test_parser(self):
195        r1, r2, _ = bpdu.bpdu.parser(self.buf)
196
197        eq_(type(r1), type(self.msg))
198        eq_(r1._protocol_id, self.protocol_id)
199        eq_(r1._version_id, self.version_id)
200        eq_(r1._bpdu_type, self.bpdu_type)
201        eq_(r2, None)
202
203    def test_serialize(self):
204        data = bytearray()
205        prev = None
206        buf = self.msg.serialize(data, prev)
207        res = struct.unpack(self.fmt, buf)
208
209        eq_(res[0], self.protocol_id)
210        eq_(res[1], self.version_id)
211        eq_(res[2], self.bpdu_type)
212
213    def test_json(self):
214        jsondict = self.msg.to_jsondict()
215        msg = bpdu.TopologyChangeNotificationBPDUs.from_jsondict(
216            jsondict['TopologyChangeNotificationBPDUs'])
217        eq_(str(self.msg), str(msg))
218
219
220class Test_RstBPDUs(unittest.TestCase):
221    """ Test case for RstBPDUs
222    """
223
224    def setUp(self):
225        self.protocol_id = bpdu.PROTOCOL_IDENTIFIER
226        self.version_id = bpdu.RstBPDUs.VERSION_ID
227        self.bpdu_type = bpdu.RstBPDUs.BPDU_TYPE
228        self.flags = 0b01111110
229        self.root_priority = 4096
230        self.root_system_id_extension = 1
231        self.root_mac_address = '12:34:56:78:9a:bc'
232        self.root_path_cost = 2
233        self.bridge_priority = 8192
234        self.bridge_system_id_extension = 3
235        self.bridge_mac_address = 'aa:aa:aa:aa:aa:aa'
236        self.port_priority = 16
237        self.port_number = 4
238        self.message_age = 5
239        self.max_age = 6
240        self.hello_time = 7
241        self.forward_delay = 8
242        self.version_1_length = bpdu.VERSION_1_LENGTH
243
244        self.msg = bpdu.RstBPDUs(
245            flags=self.flags,
246            root_priority=self.root_priority,
247            root_system_id_extension=self.root_system_id_extension,
248            root_mac_address=self.root_mac_address,
249            root_path_cost=self.root_path_cost,
250            bridge_priority=self.bridge_priority,
251            bridge_system_id_extension=self.bridge_system_id_extension,
252            bridge_mac_address=self.bridge_mac_address,
253            port_priority=self.port_priority,
254            port_number=self.port_number,
255            message_age=self.message_age,
256            max_age=self.max_age,
257            hello_time=self.hello_time,
258            forward_delay=self.forward_delay)
259
260        self.fmt = (bpdu.bpdu._PACK_STR
261                    + bpdu.ConfigurationBPDUs._PACK_STR[1:]
262                    + bpdu.RstBPDUs._PACK_STR[1:])
263        self.buf = struct.pack(self.fmt,
264                               self.protocol_id, self.version_id,
265                               self.bpdu_type, self.flags,
266                               bpdu.RstBPDUs.encode_bridge_id(
267                                   self.root_priority,
268                                   self.root_system_id_extension,
269                                   self.root_mac_address),
270                               self.root_path_cost,
271                               bpdu.RstBPDUs.encode_bridge_id(
272                                   self.bridge_priority,
273                                   self.bridge_system_id_extension,
274                                   self.bridge_mac_address),
275                               bpdu.RstBPDUs.encode_port_id(
276                                   self.port_priority,
277                                   self.port_number),
278                               bpdu.RstBPDUs._encode_timer(self.message_age),
279                               bpdu.RstBPDUs._encode_timer(self.max_age),
280                               bpdu.RstBPDUs._encode_timer(self.hello_time),
281                               bpdu.RstBPDUs._encode_timer(self.forward_delay),
282                               self.version_1_length)
283
284    def test_init(self):
285        eq_(self.protocol_id, self.msg._protocol_id)
286        eq_(self.version_id, self.msg._version_id)
287        eq_(self.bpdu_type, self.msg._bpdu_type)
288        eq_(self.flags, self.msg.flags)
289        eq_(self.root_priority, self.msg.root_priority)
290        eq_(self.root_system_id_extension,
291            self.msg.root_system_id_extension)
292        eq_(self.root_mac_address, self.msg.root_mac_address)
293        eq_(self.root_path_cost, self.msg.root_path_cost)
294        eq_(self.bridge_priority, self.msg.bridge_priority)
295        eq_(self.bridge_system_id_extension,
296            self.msg.bridge_system_id_extension)
297        eq_(self.bridge_mac_address, self.msg.bridge_mac_address)
298        eq_(self.port_priority, self.msg.port_priority)
299        eq_(self.port_number, self.msg.port_number)
300        eq_(self.message_age, self.msg.message_age)
301        eq_(self.max_age, self.msg.max_age)
302        eq_(self.hello_time, self.msg.hello_time)
303        eq_(self.forward_delay, self.msg.forward_delay)
304        eq_(self.version_1_length, self.msg._version_1_length)
305
306    def test_parser(self):
307        r1, r2, _ = bpdu.bpdu.parser(self.buf)
308
309        eq_(type(r1), type(self.msg))
310        eq_(r1._protocol_id, self.protocol_id)
311        eq_(r1._version_id, self.version_id)
312        eq_(r1._bpdu_type, self.bpdu_type)
313        eq_(r1.flags, self.flags)
314        eq_(r1.root_priority, self.root_priority)
315        eq_(r1.root_system_id_extension, self.root_system_id_extension)
316        eq_(r1.root_mac_address, self.root_mac_address)
317        eq_(r1.root_path_cost, self.root_path_cost)
318        eq_(r1.bridge_priority, self.bridge_priority)
319        eq_(r1.bridge_system_id_extension, self.bridge_system_id_extension)
320        eq_(r1.bridge_mac_address, self.bridge_mac_address)
321        eq_(r1.port_priority, self.port_priority)
322        eq_(r1.port_number, self.port_number)
323        eq_(r1.message_age, self.message_age)
324        eq_(r1.max_age, self.max_age)
325        eq_(r1.hello_time, self.hello_time)
326        eq_(r1.forward_delay, self.forward_delay)
327        eq_(r1._version_1_length, self.version_1_length)
328        eq_(r2, None)
329
330    def test_serialize(self):
331        data = bytearray()
332        prev = None
333        buf = self.msg.serialize(data, prev)
334        res = struct.unpack(self.fmt, buf)
335
336        eq_(res[0], self.protocol_id)
337        eq_(res[1], self.version_id)
338        eq_(res[2], self.bpdu_type)
339        eq_(res[3], self.flags)
340        eq_(bpdu.RstBPDUs._decode_bridge_id(res[4]),
341            (self.root_priority,
342             self.root_system_id_extension,
343             self.root_mac_address))
344        eq_(res[5], self.root_path_cost)
345        eq_(bpdu.RstBPDUs._decode_bridge_id(res[6]),
346            (self.bridge_priority,
347             self.bridge_system_id_extension,
348             self.bridge_mac_address))
349        eq_(bpdu.RstBPDUs._decode_port_id(res[7]),
350            (self.port_priority,
351             self.port_number))
352        eq_(bpdu.RstBPDUs._decode_timer(res[8]), self.message_age)
353        eq_(bpdu.RstBPDUs._decode_timer(res[9]), self.max_age)
354        eq_(bpdu.RstBPDUs._decode_timer(res[10]), self.hello_time)
355        eq_(bpdu.RstBPDUs._decode_timer(res[11]), self.forward_delay)
356        eq_(res[12], self.version_1_length)
357
358    def test_json(self):
359        jsondict = self.msg.to_jsondict()
360        msg = bpdu.RstBPDUs.from_jsondict(jsondict['RstBPDUs'])
361        eq_(str(self.msg), str(msg))
362
363
364class Test_UnknownVersion(unittest.TestCase):
365    """ Test case for unknown BPDU version
366    """
367
368    def setUp(self):
369        self.protocol_id = bpdu.PROTOCOL_IDENTIFIER
370        self.version_id = 111  # Unknown version
371        self.bpdu_type = bpdu.RstBPDUs.BPDU_TYPE
372        self.flags = 0b01111110
373        self.root_priority = 4096
374        self.root_system_id_extension = 1
375        self.root_mac_address = '12:34:56:78:9a:bc'
376        self.root_path_cost = 2
377        self.bridge_priority = 8192
378        self.bridge_system_id_extension = 3
379        self.bridge_mac_address = 'aa:aa:aa:aa:aa:aa'
380        self.port_priority = 16
381        self.port_number = 4
382        self.message_age = 5
383        self.max_age = 6
384        self.hello_time = 7
385        self.forward_delay = 8
386        self.version_1_length = bpdu.VERSION_1_LENGTH
387
388        self.fmt = (bpdu.bpdu._PACK_STR
389                    + bpdu.ConfigurationBPDUs._PACK_STR[1:]
390                    + bpdu.RstBPDUs._PACK_STR[1:])
391        self.buf = struct.pack(self.fmt,
392                               self.protocol_id, self.version_id,
393                               self.bpdu_type, self.flags,
394                               bpdu.RstBPDUs.encode_bridge_id(
395                                   self.root_priority,
396                                   self.root_system_id_extension,
397                                   self.root_mac_address),
398                               self.root_path_cost,
399                               bpdu.RstBPDUs.encode_bridge_id(
400                                   self.bridge_priority,
401                                   self.bridge_system_id_extension,
402                                   self.bridge_mac_address),
403                               bpdu.RstBPDUs.encode_port_id(
404                                   self.port_priority,
405                                   self.port_number),
406                               bpdu.RstBPDUs._encode_timer(self.message_age),
407                               bpdu.RstBPDUs._encode_timer(self.max_age),
408                               bpdu.RstBPDUs._encode_timer(self.hello_time),
409                               bpdu.RstBPDUs._encode_timer(self.forward_delay),
410                               self.version_1_length)
411
412    def test_parser(self):
413        r1, r2, _ = bpdu.bpdu.parser(self.buf)
414        eq_(r1, self.buf)
415        eq_(r2, None)
416
417
418class Test_UnknownType(unittest.TestCase):
419    """ Test case for unknown BPDU type
420    """
421
422    def setUp(self):
423        self.protocol_id = bpdu.PROTOCOL_IDENTIFIER
424        self.version_id = bpdu.RstBPDUs.VERSION_ID
425        self.bpdu_type = 222  # Unknown type
426        self.flags = 0b01111110
427        self.root_priority = 4096
428        self.root_system_id_extension = 1
429        self.root_mac_address = '12:34:56:78:9a:bc'
430        self.root_path_cost = 2
431        self.bridge_priority = 8192
432        self.bridge_system_id_extension = 3
433        self.bridge_mac_address = 'aa:aa:aa:aa:aa:aa'
434        self.port_priority = 16
435        self.port_number = 4
436        self.message_age = 5
437        self.max_age = 6
438        self.hello_time = 7
439        self.forward_delay = 8
440        self.version_1_length = bpdu.VERSION_1_LENGTH
441
442        self.fmt = (bpdu.bpdu._PACK_STR
443                    + bpdu.ConfigurationBPDUs._PACK_STR[1:]
444                    + bpdu.RstBPDUs._PACK_STR[1:])
445        self.buf = struct.pack(self.fmt,
446                               self.protocol_id, self.version_id,
447                               self.bpdu_type, self.flags,
448                               bpdu.RstBPDUs.encode_bridge_id(
449                                   self.root_priority,
450                                   self.root_system_id_extension,
451                                   self.root_mac_address),
452                               self.root_path_cost,
453                               bpdu.RstBPDUs.encode_bridge_id(
454                                   self.bridge_priority,
455                                   self.bridge_system_id_extension,
456                                   self.bridge_mac_address),
457                               bpdu.RstBPDUs.encode_port_id(
458                                   self.port_priority,
459                                   self.port_number),
460                               bpdu.RstBPDUs._encode_timer(self.message_age),
461                               bpdu.RstBPDUs._encode_timer(self.max_age),
462                               bpdu.RstBPDUs._encode_timer(self.hello_time),
463                               bpdu.RstBPDUs._encode_timer(self.forward_delay),
464                               self.version_1_length)
465
466    def test_parser(self):
467        r1, r2, _ = bpdu.bpdu.parser(self.buf)
468        eq_(r1, self.buf)
469        eq_(r2, None)
470