1# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 12# implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16# vim: tabstop=4 shiftwidth=4 softtabstop=4 17 18import unittest 19import logging 20import struct 21 22from nose.tools import eq_ 23from ryu.lib.packet import bpdu 24 25 26LOG = logging.getLogger(__name__) 27 28 29class Test_ConfigurationBPDUs(unittest.TestCase): 30 """ Test case for ConfigurationBPDUs 31 """ 32 33 def setUp(self): 34 self.protocol_id = bpdu.PROTOCOL_IDENTIFIER 35 self.version_id = bpdu.ConfigurationBPDUs.VERSION_ID 36 self.bpdu_type = bpdu.ConfigurationBPDUs.BPDU_TYPE 37 self.flags = 0b00000001 38 self.root_priority = 4096 39 self.root_system_id_extension = 1 40 self.root_mac_address = '12:34:56:78:9a:bc' 41 self.root_path_cost = 2 42 self.bridge_priority = 8192 43 self.bridge_system_id_extension = 3 44 self.bridge_mac_address = 'aa:aa:aa:aa:aa:aa' 45 self.port_priority = 16 46 self.port_number = 4 47 self.message_age = 5 48 self.max_age = 6 49 self.hello_time = 7 50 self.forward_delay = 8 51 52 self.msg = bpdu.ConfigurationBPDUs( 53 flags=self.flags, 54 root_priority=self.root_priority, 55 root_system_id_extension=self.root_system_id_extension, 56 root_mac_address=self.root_mac_address, 57 root_path_cost=self.root_path_cost, 58 bridge_priority=self.bridge_priority, 59 bridge_system_id_extension=self.bridge_system_id_extension, 60 bridge_mac_address=self.bridge_mac_address, 61 port_priority=self.port_priority, 62 port_number=self.port_number, 63 message_age=self.message_age, 64 max_age=self.max_age, 65 hello_time=self.hello_time, 66 forward_delay=self.forward_delay) 67 68 self.fmt = (bpdu.bpdu._PACK_STR 69 + bpdu.ConfigurationBPDUs._PACK_STR[1:]) 70 self.buf = struct.pack(self.fmt, 71 self.protocol_id, self.version_id, 72 self.bpdu_type, self.flags, 73 bpdu.ConfigurationBPDUs.encode_bridge_id( 74 self.root_priority, 75 self.root_system_id_extension, 76 self.root_mac_address), 77 self.root_path_cost, 78 bpdu.ConfigurationBPDUs.encode_bridge_id( 79 self.bridge_priority, 80 self.bridge_system_id_extension, 81 self.bridge_mac_address), 82 bpdu.ConfigurationBPDUs.encode_port_id( 83 self.port_priority, 84 self.port_number), 85 bpdu.ConfigurationBPDUs._encode_timer( 86 self.message_age), 87 bpdu.ConfigurationBPDUs._encode_timer( 88 self.max_age), 89 bpdu.ConfigurationBPDUs._encode_timer( 90 self.hello_time), 91 bpdu.ConfigurationBPDUs._encode_timer( 92 self.forward_delay)) 93 94 def test_init(self): 95 eq_(self.protocol_id, self.msg._protocol_id) 96 eq_(self.version_id, self.msg._version_id) 97 eq_(self.bpdu_type, self.msg._bpdu_type) 98 eq_(self.flags, self.msg.flags) 99 eq_(self.root_priority, self.msg.root_priority) 100 eq_(self.root_system_id_extension, 101 self.msg.root_system_id_extension) 102 eq_(self.root_mac_address, self.msg.root_mac_address) 103 eq_(self.root_path_cost, self.msg.root_path_cost) 104 eq_(self.bridge_priority, self.msg.bridge_priority) 105 eq_(self.bridge_system_id_extension, 106 self.msg.bridge_system_id_extension) 107 eq_(self.bridge_mac_address, self.msg.bridge_mac_address) 108 eq_(self.port_priority, self.msg.port_priority) 109 eq_(self.port_number, self.msg.port_number) 110 eq_(self.message_age, self.msg.message_age) 111 eq_(self.max_age, self.msg.max_age) 112 eq_(self.hello_time, self.msg.hello_time) 113 eq_(self.forward_delay, self.msg.forward_delay) 114 115 def test_parser(self): 116 r1, r2, _ = bpdu.bpdu.parser(self.buf) 117 118 eq_(type(r1), type(self.msg)) 119 eq_(r1._protocol_id, self.protocol_id) 120 eq_(r1._version_id, self.version_id) 121 eq_(r1._bpdu_type, self.bpdu_type) 122 eq_(r1.flags, self.flags) 123 eq_(r1.root_priority, self.root_priority) 124 eq_(r1.root_system_id_extension, self.root_system_id_extension) 125 eq_(r1.root_mac_address, self.root_mac_address) 126 eq_(r1.root_path_cost, self.root_path_cost) 127 eq_(r1.bridge_priority, self.bridge_priority) 128 eq_(r1.bridge_system_id_extension, self.bridge_system_id_extension) 129 eq_(r1.bridge_mac_address, self.bridge_mac_address) 130 eq_(r1.port_priority, self.port_priority) 131 eq_(r1.port_number, self.port_number) 132 eq_(r1.message_age, self.message_age) 133 eq_(r1.max_age, self.max_age) 134 eq_(r1.hello_time, self.hello_time) 135 eq_(r1.forward_delay, self.forward_delay) 136 eq_(r2, None) 137 138 def test_serialize(self): 139 data = bytearray() 140 prev = None 141 buf = self.msg.serialize(data, prev) 142 res = struct.unpack(self.fmt, buf) 143 144 eq_(res[0], self.protocol_id) 145 eq_(res[1], self.version_id) 146 eq_(res[2], self.bpdu_type) 147 eq_(res[3], self.flags) 148 eq_(bpdu.ConfigurationBPDUs._decode_bridge_id(res[4]), 149 (self.root_priority, 150 self.root_system_id_extension, 151 self.root_mac_address)) 152 eq_(res[5], self.root_path_cost) 153 eq_(bpdu.ConfigurationBPDUs._decode_bridge_id(res[6]), 154 (self.bridge_priority, 155 self.bridge_system_id_extension, 156 self.bridge_mac_address)) 157 eq_(bpdu.ConfigurationBPDUs._decode_port_id(res[7]), 158 (self.port_priority, 159 self.port_number)) 160 eq_(bpdu.ConfigurationBPDUs._decode_timer(res[8]), self.message_age) 161 eq_(bpdu.ConfigurationBPDUs._decode_timer(res[9]), self.max_age) 162 eq_(bpdu.ConfigurationBPDUs._decode_timer(res[10]), self.hello_time) 163 eq_(bpdu.ConfigurationBPDUs._decode_timer(res[11]), self.forward_delay) 164 165 def test_json(self): 166 jsondict = self.msg.to_jsondict() 167 msg = bpdu.ConfigurationBPDUs.from_jsondict( 168 jsondict['ConfigurationBPDUs']) 169 eq_(str(self.msg), str(msg)) 170 171 172class Test_TopologyChangeNotificationBPDUs(unittest.TestCase): 173 """ Test case for TopologyChangeNotificationBPDUs 174 """ 175 176 def setUp(self): 177 self.protocol_id = bpdu.PROTOCOL_IDENTIFIER 178 self.version_id = bpdu.TopologyChangeNotificationBPDUs.VERSION_ID 179 self.bpdu_type = bpdu.TopologyChangeNotificationBPDUs.BPDU_TYPE 180 181 self.msg = bpdu.TopologyChangeNotificationBPDUs() 182 183 self.fmt = bpdu.bpdu._PACK_STR 184 self.buf = struct.pack(self.fmt, 185 self.protocol_id, 186 self.version_id, 187 self.bpdu_type) 188 189 def test_init(self): 190 eq_(self.protocol_id, self.msg._protocol_id) 191 eq_(self.version_id, self.msg._version_id) 192 eq_(self.bpdu_type, self.msg._bpdu_type) 193 194 def test_parser(self): 195 r1, r2, _ = bpdu.bpdu.parser(self.buf) 196 197 eq_(type(r1), type(self.msg)) 198 eq_(r1._protocol_id, self.protocol_id) 199 eq_(r1._version_id, self.version_id) 200 eq_(r1._bpdu_type, self.bpdu_type) 201 eq_(r2, None) 202 203 def test_serialize(self): 204 data = bytearray() 205 prev = None 206 buf = self.msg.serialize(data, prev) 207 res = struct.unpack(self.fmt, buf) 208 209 eq_(res[0], self.protocol_id) 210 eq_(res[1], self.version_id) 211 eq_(res[2], self.bpdu_type) 212 213 def test_json(self): 214 jsondict = self.msg.to_jsondict() 215 msg = bpdu.TopologyChangeNotificationBPDUs.from_jsondict( 216 jsondict['TopologyChangeNotificationBPDUs']) 217 eq_(str(self.msg), str(msg)) 218 219 220class Test_RstBPDUs(unittest.TestCase): 221 """ Test case for RstBPDUs 222 """ 223 224 def setUp(self): 225 self.protocol_id = bpdu.PROTOCOL_IDENTIFIER 226 self.version_id = bpdu.RstBPDUs.VERSION_ID 227 self.bpdu_type = bpdu.RstBPDUs.BPDU_TYPE 228 self.flags = 0b01111110 229 self.root_priority = 4096 230 self.root_system_id_extension = 1 231 self.root_mac_address = '12:34:56:78:9a:bc' 232 self.root_path_cost = 2 233 self.bridge_priority = 8192 234 self.bridge_system_id_extension = 3 235 self.bridge_mac_address = 'aa:aa:aa:aa:aa:aa' 236 self.port_priority = 16 237 self.port_number = 4 238 self.message_age = 5 239 self.max_age = 6 240 self.hello_time = 7 241 self.forward_delay = 8 242 self.version_1_length = bpdu.VERSION_1_LENGTH 243 244 self.msg = bpdu.RstBPDUs( 245 flags=self.flags, 246 root_priority=self.root_priority, 247 root_system_id_extension=self.root_system_id_extension, 248 root_mac_address=self.root_mac_address, 249 root_path_cost=self.root_path_cost, 250 bridge_priority=self.bridge_priority, 251 bridge_system_id_extension=self.bridge_system_id_extension, 252 bridge_mac_address=self.bridge_mac_address, 253 port_priority=self.port_priority, 254 port_number=self.port_number, 255 message_age=self.message_age, 256 max_age=self.max_age, 257 hello_time=self.hello_time, 258 forward_delay=self.forward_delay) 259 260 self.fmt = (bpdu.bpdu._PACK_STR 261 + bpdu.ConfigurationBPDUs._PACK_STR[1:] 262 + bpdu.RstBPDUs._PACK_STR[1:]) 263 self.buf = struct.pack(self.fmt, 264 self.protocol_id, self.version_id, 265 self.bpdu_type, self.flags, 266 bpdu.RstBPDUs.encode_bridge_id( 267 self.root_priority, 268 self.root_system_id_extension, 269 self.root_mac_address), 270 self.root_path_cost, 271 bpdu.RstBPDUs.encode_bridge_id( 272 self.bridge_priority, 273 self.bridge_system_id_extension, 274 self.bridge_mac_address), 275 bpdu.RstBPDUs.encode_port_id( 276 self.port_priority, 277 self.port_number), 278 bpdu.RstBPDUs._encode_timer(self.message_age), 279 bpdu.RstBPDUs._encode_timer(self.max_age), 280 bpdu.RstBPDUs._encode_timer(self.hello_time), 281 bpdu.RstBPDUs._encode_timer(self.forward_delay), 282 self.version_1_length) 283 284 def test_init(self): 285 eq_(self.protocol_id, self.msg._protocol_id) 286 eq_(self.version_id, self.msg._version_id) 287 eq_(self.bpdu_type, self.msg._bpdu_type) 288 eq_(self.flags, self.msg.flags) 289 eq_(self.root_priority, self.msg.root_priority) 290 eq_(self.root_system_id_extension, 291 self.msg.root_system_id_extension) 292 eq_(self.root_mac_address, self.msg.root_mac_address) 293 eq_(self.root_path_cost, self.msg.root_path_cost) 294 eq_(self.bridge_priority, self.msg.bridge_priority) 295 eq_(self.bridge_system_id_extension, 296 self.msg.bridge_system_id_extension) 297 eq_(self.bridge_mac_address, self.msg.bridge_mac_address) 298 eq_(self.port_priority, self.msg.port_priority) 299 eq_(self.port_number, self.msg.port_number) 300 eq_(self.message_age, self.msg.message_age) 301 eq_(self.max_age, self.msg.max_age) 302 eq_(self.hello_time, self.msg.hello_time) 303 eq_(self.forward_delay, self.msg.forward_delay) 304 eq_(self.version_1_length, self.msg._version_1_length) 305 306 def test_parser(self): 307 r1, r2, _ = bpdu.bpdu.parser(self.buf) 308 309 eq_(type(r1), type(self.msg)) 310 eq_(r1._protocol_id, self.protocol_id) 311 eq_(r1._version_id, self.version_id) 312 eq_(r1._bpdu_type, self.bpdu_type) 313 eq_(r1.flags, self.flags) 314 eq_(r1.root_priority, self.root_priority) 315 eq_(r1.root_system_id_extension, self.root_system_id_extension) 316 eq_(r1.root_mac_address, self.root_mac_address) 317 eq_(r1.root_path_cost, self.root_path_cost) 318 eq_(r1.bridge_priority, self.bridge_priority) 319 eq_(r1.bridge_system_id_extension, self.bridge_system_id_extension) 320 eq_(r1.bridge_mac_address, self.bridge_mac_address) 321 eq_(r1.port_priority, self.port_priority) 322 eq_(r1.port_number, self.port_number) 323 eq_(r1.message_age, self.message_age) 324 eq_(r1.max_age, self.max_age) 325 eq_(r1.hello_time, self.hello_time) 326 eq_(r1.forward_delay, self.forward_delay) 327 eq_(r1._version_1_length, self.version_1_length) 328 eq_(r2, None) 329 330 def test_serialize(self): 331 data = bytearray() 332 prev = None 333 buf = self.msg.serialize(data, prev) 334 res = struct.unpack(self.fmt, buf) 335 336 eq_(res[0], self.protocol_id) 337 eq_(res[1], self.version_id) 338 eq_(res[2], self.bpdu_type) 339 eq_(res[3], self.flags) 340 eq_(bpdu.RstBPDUs._decode_bridge_id(res[4]), 341 (self.root_priority, 342 self.root_system_id_extension, 343 self.root_mac_address)) 344 eq_(res[5], self.root_path_cost) 345 eq_(bpdu.RstBPDUs._decode_bridge_id(res[6]), 346 (self.bridge_priority, 347 self.bridge_system_id_extension, 348 self.bridge_mac_address)) 349 eq_(bpdu.RstBPDUs._decode_port_id(res[7]), 350 (self.port_priority, 351 self.port_number)) 352 eq_(bpdu.RstBPDUs._decode_timer(res[8]), self.message_age) 353 eq_(bpdu.RstBPDUs._decode_timer(res[9]), self.max_age) 354 eq_(bpdu.RstBPDUs._decode_timer(res[10]), self.hello_time) 355 eq_(bpdu.RstBPDUs._decode_timer(res[11]), self.forward_delay) 356 eq_(res[12], self.version_1_length) 357 358 def test_json(self): 359 jsondict = self.msg.to_jsondict() 360 msg = bpdu.RstBPDUs.from_jsondict(jsondict['RstBPDUs']) 361 eq_(str(self.msg), str(msg)) 362 363 364class Test_UnknownVersion(unittest.TestCase): 365 """ Test case for unknown BPDU version 366 """ 367 368 def setUp(self): 369 self.protocol_id = bpdu.PROTOCOL_IDENTIFIER 370 self.version_id = 111 # Unknown version 371 self.bpdu_type = bpdu.RstBPDUs.BPDU_TYPE 372 self.flags = 0b01111110 373 self.root_priority = 4096 374 self.root_system_id_extension = 1 375 self.root_mac_address = '12:34:56:78:9a:bc' 376 self.root_path_cost = 2 377 self.bridge_priority = 8192 378 self.bridge_system_id_extension = 3 379 self.bridge_mac_address = 'aa:aa:aa:aa:aa:aa' 380 self.port_priority = 16 381 self.port_number = 4 382 self.message_age = 5 383 self.max_age = 6 384 self.hello_time = 7 385 self.forward_delay = 8 386 self.version_1_length = bpdu.VERSION_1_LENGTH 387 388 self.fmt = (bpdu.bpdu._PACK_STR 389 + bpdu.ConfigurationBPDUs._PACK_STR[1:] 390 + bpdu.RstBPDUs._PACK_STR[1:]) 391 self.buf = struct.pack(self.fmt, 392 self.protocol_id, self.version_id, 393 self.bpdu_type, self.flags, 394 bpdu.RstBPDUs.encode_bridge_id( 395 self.root_priority, 396 self.root_system_id_extension, 397 self.root_mac_address), 398 self.root_path_cost, 399 bpdu.RstBPDUs.encode_bridge_id( 400 self.bridge_priority, 401 self.bridge_system_id_extension, 402 self.bridge_mac_address), 403 bpdu.RstBPDUs.encode_port_id( 404 self.port_priority, 405 self.port_number), 406 bpdu.RstBPDUs._encode_timer(self.message_age), 407 bpdu.RstBPDUs._encode_timer(self.max_age), 408 bpdu.RstBPDUs._encode_timer(self.hello_time), 409 bpdu.RstBPDUs._encode_timer(self.forward_delay), 410 self.version_1_length) 411 412 def test_parser(self): 413 r1, r2, _ = bpdu.bpdu.parser(self.buf) 414 eq_(r1, self.buf) 415 eq_(r2, None) 416 417 418class Test_UnknownType(unittest.TestCase): 419 """ Test case for unknown BPDU type 420 """ 421 422 def setUp(self): 423 self.protocol_id = bpdu.PROTOCOL_IDENTIFIER 424 self.version_id = bpdu.RstBPDUs.VERSION_ID 425 self.bpdu_type = 222 # Unknown type 426 self.flags = 0b01111110 427 self.root_priority = 4096 428 self.root_system_id_extension = 1 429 self.root_mac_address = '12:34:56:78:9a:bc' 430 self.root_path_cost = 2 431 self.bridge_priority = 8192 432 self.bridge_system_id_extension = 3 433 self.bridge_mac_address = 'aa:aa:aa:aa:aa:aa' 434 self.port_priority = 16 435 self.port_number = 4 436 self.message_age = 5 437 self.max_age = 6 438 self.hello_time = 7 439 self.forward_delay = 8 440 self.version_1_length = bpdu.VERSION_1_LENGTH 441 442 self.fmt = (bpdu.bpdu._PACK_STR 443 + bpdu.ConfigurationBPDUs._PACK_STR[1:] 444 + bpdu.RstBPDUs._PACK_STR[1:]) 445 self.buf = struct.pack(self.fmt, 446 self.protocol_id, self.version_id, 447 self.bpdu_type, self.flags, 448 bpdu.RstBPDUs.encode_bridge_id( 449 self.root_priority, 450 self.root_system_id_extension, 451 self.root_mac_address), 452 self.root_path_cost, 453 bpdu.RstBPDUs.encode_bridge_id( 454 self.bridge_priority, 455 self.bridge_system_id_extension, 456 self.bridge_mac_address), 457 bpdu.RstBPDUs.encode_port_id( 458 self.port_priority, 459 self.port_number), 460 bpdu.RstBPDUs._encode_timer(self.message_age), 461 bpdu.RstBPDUs._encode_timer(self.max_age), 462 bpdu.RstBPDUs._encode_timer(self.hello_time), 463 bpdu.RstBPDUs._encode_timer(self.forward_delay), 464 self.version_1_length) 465 466 def test_parser(self): 467 r1, r2, _ = bpdu.bpdu.parser(self.buf) 468 eq_(r1, self.buf) 469 eq_(r2, None) 470