1# Copyright (c) 2003-2016 CORE Security Technologies 2# 3# This software is provided under under a slightly modified version 4# of the Apache Software License. See the accompanying LICENSE file 5# for more information. 6# 7# Description: 8# Cisco Discovery Protocol packet codecs. 9# 10# Author: 11# Martin Candurra 12# martincad at corest.com 13 14from struct import unpack 15import socket 16 17from ImpactPacket import Header 18from impacket import LOG 19 20IP_ADDRESS_LENGTH = 4 21 22class CDPTypes: 23 24 DeviceID_Type = 1 25 Address_Type = 2 26 PortID_Type = 3 27 Capabilities_Type = 4 28 SoftVersion_Type = 5 29 Platform_Type = 6 30 IPPrefix_Type = 7 31 ProtocolHello_Type = 8 32 MTU_Type = 17 33 SystemName_Type = 20 34 SystemObjectId_Type = 21 35 SnmpLocation = 23 36 37class CDP(Header): 38 39 Type = 0x2000 40 OUI = 0x00000c 41 42 def __init__(self, aBuffer = None): 43 Header.__init__(self, 8) 44 if aBuffer: 45 self.load_header(aBuffer) 46 self._elements = self._getElements(aBuffer) 47 48 def _getElements(self, aBuffer): 49 # Remove version (1 byte), TTL (1 byte), and checksum (2 bytes) 50 buff = aBuffer[4:] 51 l = [] 52 finish = False 53 while buff: 54 elem = CDPElementFactory.create(buff) 55 data = elem.get_data() 56 l.append( elem ) 57 buff = buff[ elem.get_length() : ] 58 return l 59 60 def get_header_size(self): 61 return 8 62 63 def get_version(self): 64 return self.get_byte(0) 65 66 def get_ttl(self): 67 return self.get_byte(1) 68 69 def get_checksum(self): 70 return self.get_word(2) 71 72 def get_type(self): 73 return self.get_word(4) 74 75 def get_lenght(self): 76 return self.get_word(6) 77 78 def getElements(self): 79 return self._elements 80 81 82 def __str__(self): 83 knowcode = 0 84 tmp_str = 'CDP Details:\n' 85 for element in self._elements: 86 tmp_str += "** Type:" + str(element.get_type()) + " " + str(element) + "\n" 87 return tmp_str 88 89 90def get_byte(buffer, offset): 91 return unpack("!B", buffer[offset:offset+1])[0] 92 93def get_word(buffer, offset): 94 return unpack("!h", buffer[offset:offset+2])[0] 95 96def get_long(buffer, offset): 97 return unpack("!I", buffer[offset:offset+4])[0] 98 99def get_bytes(buffer, offset, bytes): 100 return buffer[offset:offset + bytes] 101 102def mac_to_string(mac_bytes): 103 bytes = unpack('!BBBBBB', mac_bytes) 104 s = '' 105 for byte in bytes: 106 s += '%02x:' % byte 107 return s[0:-1] 108 109 110 111class CDPElement(Header): 112 113 def __init__(self, aBuffer = None): 114 Header.__init__(self, 8) 115 if aBuffer: 116 self._length = CDPElement.Get_length(aBuffer) 117 self.load_header( aBuffer[:self._length] ) 118 119 @classmethod 120 def Get_length(cls, aBuffer): 121 return unpack('!h', aBuffer[2:4])[0] 122 123 def get_header_size(self): 124 self._length 125 126 def get_length(self): 127 return self.get_word(2) 128 129 def get_data(self): 130 return self.get_bytes().tostring()[4:self.get_length()] 131 132 def get_ip_address(self, offset = 0, ip = None): 133 if not ip: 134 ip = self.get_bytes().tostring()[offset : offset + IP_ADDRESS_LENGTH] 135 return socket.inet_ntoa( ip ) 136 137class CDPDevice(CDPElement): 138 Type = 1 139 140 def get_type(self): 141 return CDPDevice.Type 142 143 def get_device_id(self): 144 return CDPElement.get_data(self) 145 146 def __str__(self): 147 return "Device:" + self.get_device_id() 148 149class Address(CDPElement): 150 Type = 2 151 152 def __init__(self, aBuffer = None): 153 CDPElement.__init__(self, aBuffer) 154 if aBuffer: 155 data = self.get_bytes().tostring()[8:] 156 self._generateAddressDetails(data) 157 158 def _generateAddressDetails(self, buff): 159 self.address_details = [] 160 while buff: 161 address = AddressDetails.create(buff) 162 self.address_details.append( address ) 163 buff = buff[address.get_total_length():] 164 165 def get_type(self): 166 return Address.Type 167 168 def get_number(self): 169 return self.get_long(4) 170 171 def get_address_details(self): 172 return self.address_details 173 174 def __str__(self): 175 tmp_str = "Addresses:" 176 for address_detail in self.address_details: 177 tmp_str += "\n" + str(address_detail) 178 return tmp_str 179 180class AddressDetails(): 181 182 PROTOCOL_IP = 0xcc 183 184 @classmethod 185 def create(cls, buff): 186 a = AddressDetails(buff) 187 return a 188 189 190 def __init__(self, aBuffer = None): 191 if aBuffer: 192 addr_length = unpack("!h", aBuffer[3:5])[0] 193 self.total_length = addr_length + 5 194 self.buffer = aBuffer[:self.total_length] 195 196 def get_total_length(self): 197 return self.total_length 198 199 def get_protocol_type(self): 200 return self.buffer[0:1] 201 202 def get_protocol_length(self): 203 return get_byte( self.buffer, 1) 204 205 def get_protocol(self): 206 return get_byte( self.buffer, 2) 207 208 def get_address_length(self): 209 return get_word( self.buffer, 3) 210 211 def get_address(self): 212 address = get_bytes( self.buffer, 5, self.get_address_length() ) 213 if self.get_protocol()==AddressDetails.PROTOCOL_IP: 214 return socket.inet_ntoa(address) 215 else: 216 LOG.error("Address not IP") 217 return address 218 219 def is_protocol_IP(self): 220 return self.get_protocol()==AddressDetails.PROTOCOL_IP 221 222 def __str__(self): 223 return "Protocol Type:%r Protocol:%r Address Length:%r Address:%s" % (self.get_protocol_type(), self.get_protocol(), self.get_address_length(), self.get_address()) 224 225class Port(CDPElement): 226 Type = 3 227 228 def get_type(self): 229 return Port.Type 230 231 def get_port(self): 232 return CDPElement.get_data(self) 233 234 def __str__(self): 235 return "Port:" + self.get_port() 236 237 238class Capabilities(CDPElement): 239 Type = 4 240 241 def __init__(self, aBuffer = None): 242 CDPElement.__init__(self, aBuffer) 243 self._capabilities_processed = False 244 245 self._router = False 246 self._transparent_bridge = False 247 self._source_route_bridge = False 248 self._switch = False 249 self._host = False 250 self._igmp_capable = False 251 self._repeater = False 252 self._init_capabilities() 253 254 def get_type(self): 255 return Capabilities.Type 256 257 def get_capabilities(self): 258 return CDPElement.get_data(self) 259 260 def _init_capabilities(self): 261 if self._capabilities_processed: 262 return 263 264 capabilities = unpack("!L", self.get_capabilities())[0] 265 self._router = (capabilities & 0x1) > 0 266 self._transparent_bridge = (capabilities & 0x02) > 0 267 self._source_route_bridge = (capabilities & 0x04) > 0 268 self._switch = (capabilities & 0x08) > 0 269 self._host = (capabilities & 0x10) > 0 270 self._igmp_capable = (capabilities & 0x20) > 0 271 self._repeater = (capabilities & 0x40) > 0 272 273 def is_router(self): 274 return self._router 275 276 def is_transparent_bridge(self): 277 return self._transparent_bridge 278 279 def is_source_route_bridge(self): 280 return self._source_route_bridge 281 282 def is_switch(self): 283 return self._switch 284 285 def is_host(self): 286 return self.is_host 287 288 def is_igmp_capable(self): 289 return self._igmp_capable 290 291 def is_repeater(self): 292 return self._repeater 293 294 295 def __str__(self): 296 return "Capabilities:" + self.get_capabilities() 297 298 299class SoftVersion(CDPElement): 300 Type = 5 301 302 def get_type(self): 303 return SoftVersion.Type 304 305 def get_version(self): 306 return CDPElement.get_data(self) 307 308 def __str__(self): 309 return "Version:" + self.get_version() 310 311 312class Platform(CDPElement): 313 Type = 6 314 315 def get_type(self): 316 return Platform.Type 317 318 def get_platform(self): 319 return CDPElement.get_data(self) 320 321 def __str__(self): 322 return "Platform:%r" % self.get_platform() 323 324 325class IpPrefix(CDPElement): 326 Type = 7 327 328 def get_type(self): 329 return IpPrefix .Type 330 331 def get_ip_prefix(self): 332 return CDPElement.get_ip_address(self, 4) 333 334 def get_bits(self): 335 return self.get_byte(8) 336 337 def __str__(self): 338 return "IP Prefix/Gateway: %r/%d" % (self.get_ip_prefix(), self.get_bits()) 339 340class ProtocolHello(CDPElement): 341 Type = 8 342 343 def get_type(self): 344 return ProtocolHello.Type 345 346 def get_master_ip(self): 347 return self.get_ip_address(9) 348 349 def get_version(self): 350 return self.get_byte(17) 351 352 def get_sub_version(self): 353 return self.get_byte(18) 354 355 def get_status(self): 356 return self.get_byte(19) 357 358 def get_cluster_command_mac(self): 359 return self.get_bytes().tostring()[20:20+6] 360 361 def get_switch_mac(self): 362 return self.get_bytes().tostring()[28:28+6] 363 364 def get_management_vlan(self): 365 return self.get_word(36) 366 367 def __str__(self): 368 return "\n\n\nProcolHello: Master IP:%s version:%r subversion:%r status:%r Switch's Mac:%r Management VLAN:%r" \ 369 % (self.get_master_ip(), self.get_version(), self.get_sub_version(), self.get_status(), mac_to_string(self.get_switch_mac()), self.get_management_vlan()) 370 371class VTPManagementDomain(CDPElement): 372 Type = 9 373 374 def get_type(self): 375 return VTPManagementDomain.Type 376 377 def get_domain(self): 378 return CDPElement.get_data(self) 379 380 381class Duplex(CDPElement): 382 Type = 0xb 383 384 def get_type(self): 385 return Duplex.Type 386 387 def get_duplex(self): 388 return CDPElement.get_data(self) 389 390 def is_full_duplex(self): 391 return self.get_duplex()==0x1 392 393class VLAN(CDPElement): 394 Type = 0xa 395 396 def get_type(self): 397 return VLAN.Type 398 399 def get_vlan_number(self): 400 return CDPElement.get_data(self) 401 402 403 404class TrustBitmap(CDPElement): 405 Type = 0x12 406 407 def get_type(self): 408 return TrustBitmap.Type 409 410 def get_trust_bitmap(self): 411 return self.get_data() 412 413 def __str__(self): 414 return "TrustBitmap Trust Bitmap:%r" % self.get_trust_bitmap() 415 416class UntrustedPortCoS(CDPElement): 417 Type = 0x13 418 419 def get_type(self): 420 return UntrustedPortCoS.Type 421 422 def get_port_CoS(self): 423 return self.get_data() 424 425 def __str__(self): 426 return "UntrustedPortCoS port CoS %r" % self.get_port_CoS() 427 428class ManagementAddresses(Address): 429 Type = 0x16 430 431 def get_type(self): 432 return ManagementAddresses.Type 433 434class MTU(CDPElement): 435 Type = 0x11 436 437 def get_type(self): 438 return MTU.Type 439 440class SystemName(CDPElement): 441 Type = 0x14 442 443 def get_type(self): 444 return SystemName.Type 445 446class SystemObjectId(CDPElement): 447 Type = 0x15 448 449 def get_type(self): 450 return SystemObjectId.Type 451 452class SnmpLocation(CDPElement): 453 Type = 0x17 454 455 def get_type(self): 456 return SnmpLocation.Type 457 458 459class DummyCdpElement(CDPElement): 460 Type = 0x99 461 462 def get_type(self): 463 return DummyCdpElement.Type 464 465class CDPElementFactory(): 466 467 elementTypeMap = { 468 CDPDevice.Type : CDPDevice, 469 Port.Type : Port, 470 Capabilities.Type : Capabilities, 471 Address.Type : Address, 472 SoftVersion.Type : SoftVersion, 473 Platform.Type : Platform, 474 IpPrefix.Type : IpPrefix, 475 ProtocolHello.Type : ProtocolHello, 476 VTPManagementDomain.Type : VTPManagementDomain, 477 VLAN.Type : VLAN, 478 Duplex.Type : Duplex, 479 TrustBitmap.Type : TrustBitmap, 480 UntrustedPortCoS.Type : UntrustedPortCoS, 481 ManagementAddresses.Type : ManagementAddresses, 482 MTU.Type : MTU, 483 SystemName.Type : SystemName, 484 SystemObjectId.Type : SystemObjectId, 485 SnmpLocation.Type : SnmpLocation 486 } 487 488 @classmethod 489 def create(cls, aBuffer): 490# print "CDPElementFactory.create aBuffer:", repr(aBuffer) 491# print "CDPElementFactory.create sub_type:", repr(aBuffer[0:2]) 492 _type = unpack("!h", aBuffer[0:2])[0] 493# print "CDPElementFactory.create _type:", _type 494 try: 495 class_type = cls.elementTypeMap[_type] 496 except KeyError: 497 class_type = DummyCdpElement 498 #raise Exception("CDP Element type %s not implemented" % _type) 499 return class_type( aBuffer ) 500