1# Copyright (c) 2003-2016 CORE Security Technologies
2#
3# This software is provided under under a slightly modified version
4# of the Apache Software License. See the accompanying LICENSE file
5# for more information.
6#
7# Description:
8#    Cisco Discovery Protocol packet codecs.
9#
10# Author:
11#  Martin Candurra
12#  martincad at corest.com
13
14from struct import unpack
15import socket
16
17from ImpactPacket import Header
18from impacket import LOG
19
20IP_ADDRESS_LENGTH = 4
21
22class CDPTypes:
23
24    DeviceID_Type       = 1
25    Address_Type        = 2
26    PortID_Type         = 3
27    Capabilities_Type   = 4
28    SoftVersion_Type    = 5
29    Platform_Type       = 6
30    IPPrefix_Type       = 7
31    ProtocolHello_Type  = 8
32    MTU_Type            = 17
33    SystemName_Type     = 20
34    SystemObjectId_Type = 21
35    SnmpLocation        = 23
36
37class CDP(Header):
38
39    Type = 0x2000
40    OUI =  0x00000c
41
42    def __init__(self, aBuffer = None):
43        Header.__init__(self, 8)
44        if aBuffer:
45            self.load_header(aBuffer)
46            self._elements = self._getElements(aBuffer)
47
48    def _getElements(self, aBuffer):
49        # Remove version (1 byte), TTL (1 byte), and checksum (2 bytes)
50        buff = aBuffer[4:]
51        l = []
52        finish = False
53        while buff:
54            elem = CDPElementFactory.create(buff)
55            data = elem.get_data()
56            l.append( elem )
57            buff = buff[ elem.get_length() : ]
58        return l
59
60    def get_header_size(self):
61        return 8
62
63    def get_version(self):
64        return self.get_byte(0)
65
66    def get_ttl(self):
67        return self.get_byte(1)
68
69    def get_checksum(self):
70        return self.get_word(2)
71
72    def get_type(self):
73        return self.get_word(4)
74
75    def get_lenght(self):
76        return self.get_word(6)
77
78    def getElements(self):
79        return self._elements
80
81
82    def __str__(self):
83        knowcode = 0
84        tmp_str = 'CDP Details:\n'
85        for element in self._elements:
86            tmp_str += "** Type:" + str(element.get_type()) + " " + str(element) + "\n"
87        return tmp_str
88
89
90def get_byte(buffer, offset):
91    return unpack("!B", buffer[offset:offset+1])[0]
92
93def get_word(buffer, offset):
94    return unpack("!h", buffer[offset:offset+2])[0]
95
96def get_long(buffer, offset):
97    return unpack("!I", buffer[offset:offset+4])[0]
98
99def get_bytes(buffer, offset, bytes):
100    return buffer[offset:offset + bytes]
101
102def mac_to_string(mac_bytes):
103    bytes = unpack('!BBBBBB', mac_bytes)
104    s = ''
105    for byte in bytes:
106        s += '%02x:' % byte
107    return s[0:-1]
108
109
110
111class CDPElement(Header):
112
113    def __init__(self, aBuffer = None):
114        Header.__init__(self, 8)
115        if aBuffer:
116            self._length = CDPElement.Get_length(aBuffer)
117            self.load_header( aBuffer[:self._length] )
118
119    @classmethod
120    def Get_length(cls, aBuffer):
121        return unpack('!h', aBuffer[2:4])[0]
122
123    def get_header_size(self):
124        self._length
125
126    def get_length(self):
127        return self.get_word(2)
128
129    def get_data(self):
130        return self.get_bytes().tostring()[4:self.get_length()]
131
132    def get_ip_address(self, offset = 0, ip = None):
133        if not ip:
134            ip = self.get_bytes().tostring()[offset : offset + IP_ADDRESS_LENGTH]
135        return socket.inet_ntoa( ip )
136
137class CDPDevice(CDPElement):
138    Type = 1
139
140    def get_type(self):
141        return CDPDevice.Type
142
143    def get_device_id(self):
144        return CDPElement.get_data(self)
145
146    def __str__(self):
147        return "Device:" + self.get_device_id()
148
149class Address(CDPElement):
150    Type = 2
151
152    def __init__(self, aBuffer = None):
153        CDPElement.__init__(self, aBuffer)
154        if aBuffer:
155            data = self.get_bytes().tostring()[8:]
156            self._generateAddressDetails(data)
157
158    def _generateAddressDetails(self, buff):
159        self.address_details = []
160        while buff:
161            address = AddressDetails.create(buff)
162            self.address_details.append( address )
163            buff = buff[address.get_total_length():]
164
165    def get_type(self):
166        return Address.Type
167
168    def get_number(self):
169        return self.get_long(4)
170
171    def get_address_details(self):
172        return self.address_details
173
174    def __str__(self):
175        tmp_str = "Addresses:"
176        for address_detail in self.address_details:
177            tmp_str += "\n" + str(address_detail)
178        return tmp_str
179
180class AddressDetails():
181
182    PROTOCOL_IP = 0xcc
183
184    @classmethod
185    def create(cls, buff):
186        a = AddressDetails(buff)
187        return a
188
189
190    def __init__(self, aBuffer = None):
191        if aBuffer:
192            addr_length = unpack("!h", aBuffer[3:5])[0]
193            self.total_length = addr_length + 5
194            self.buffer = aBuffer[:self.total_length]
195
196    def get_total_length(self):
197        return self.total_length
198
199    def get_protocol_type(self):
200        return self.buffer[0:1]
201
202    def get_protocol_length(self):
203        return get_byte( self.buffer, 1)
204
205    def get_protocol(self):
206        return get_byte( self.buffer, 2)
207
208    def get_address_length(self):
209        return get_word( self.buffer, 3)
210
211    def get_address(self):
212        address =  get_bytes( self.buffer, 5, self.get_address_length() )
213        if  self.get_protocol()==AddressDetails.PROTOCOL_IP:
214            return socket.inet_ntoa(address)
215        else:
216            LOG.error("Address not IP")
217            return address
218
219    def is_protocol_IP(self):
220        return self.get_protocol()==AddressDetails.PROTOCOL_IP
221
222    def __str__(self):
223        return "Protocol Type:%r Protocol:%r Address Length:%r Address:%s" % (self.get_protocol_type(), self.get_protocol(), self.get_address_length(), self.get_address())
224
225class Port(CDPElement):
226    Type = 3
227
228    def get_type(self):
229        return Port.Type
230
231    def get_port(self):
232        return CDPElement.get_data(self)
233
234    def __str__(self):
235        return "Port:" + self.get_port()
236
237
238class Capabilities(CDPElement):
239    Type = 4
240
241    def __init__(self, aBuffer = None):
242        CDPElement.__init__(self, aBuffer)
243        self._capabilities_processed = False
244
245        self._router = False
246        self._transparent_bridge = False
247        self._source_route_bridge = False
248        self._switch = False
249        self._host = False
250        self._igmp_capable = False
251        self._repeater = False
252        self._init_capabilities()
253
254    def get_type(self):
255        return Capabilities.Type
256
257    def get_capabilities(self):
258        return CDPElement.get_data(self)
259
260    def _init_capabilities(self):
261        if self._capabilities_processed:
262            return
263
264        capabilities = unpack("!L", self.get_capabilities())[0]
265        self._router = (capabilities & 0x1) > 0
266        self._transparent_bridge = (capabilities & 0x02) > 0
267        self._source_route_bridge = (capabilities & 0x04) > 0
268        self._switch = (capabilities & 0x08) > 0
269        self._host = (capabilities & 0x10) > 0
270        self._igmp_capable = (capabilities & 0x20) > 0
271        self._repeater = (capabilities & 0x40) > 0
272
273    def is_router(self):
274        return self._router
275
276    def is_transparent_bridge(self):
277        return self._transparent_bridge
278
279    def is_source_route_bridge(self):
280        return self._source_route_bridge
281
282    def is_switch(self):
283        return self._switch
284
285    def is_host(self):
286        return self.is_host
287
288    def is_igmp_capable(self):
289        return self._igmp_capable
290
291    def is_repeater(self):
292        return self._repeater
293
294
295    def __str__(self):
296        return "Capabilities:" + self.get_capabilities()
297
298
299class SoftVersion(CDPElement):
300    Type = 5
301
302    def get_type(self):
303        return SoftVersion.Type
304
305    def get_version(self):
306        return CDPElement.get_data(self)
307
308    def __str__(self):
309        return "Version:" + self.get_version()
310
311
312class Platform(CDPElement):
313    Type = 6
314
315    def get_type(self):
316        return Platform.Type
317
318    def get_platform(self):
319        return CDPElement.get_data(self)
320
321    def __str__(self):
322        return "Platform:%r" % self.get_platform()
323
324
325class IpPrefix(CDPElement):
326    Type = 7
327
328    def get_type(self):
329        return IpPrefix .Type
330
331    def get_ip_prefix(self):
332        return CDPElement.get_ip_address(self, 4)
333
334    def get_bits(self):
335        return self.get_byte(8)
336
337    def __str__(self):
338        return "IP Prefix/Gateway: %r/%d" % (self.get_ip_prefix(), self.get_bits())
339
340class ProtocolHello(CDPElement):
341    Type = 8
342
343    def get_type(self):
344        return ProtocolHello.Type
345
346    def get_master_ip(self):
347        return self.get_ip_address(9)
348
349    def get_version(self):
350        return self.get_byte(17)
351
352    def get_sub_version(self):
353        return self.get_byte(18)
354
355    def get_status(self):
356        return self.get_byte(19)
357
358    def get_cluster_command_mac(self):
359        return self.get_bytes().tostring()[20:20+6]
360
361    def get_switch_mac(self):
362        return self.get_bytes().tostring()[28:28+6]
363
364    def get_management_vlan(self):
365        return self.get_word(36)
366
367    def __str__(self):
368        return "\n\n\nProcolHello: Master IP:%s version:%r subversion:%r status:%r Switch's Mac:%r Management VLAN:%r" \
369         % (self.get_master_ip(), self.get_version(), self.get_sub_version(), self.get_status(), mac_to_string(self.get_switch_mac()), self.get_management_vlan())
370
371class VTPManagementDomain(CDPElement):
372    Type = 9
373
374    def get_type(self):
375        return VTPManagementDomain.Type
376
377    def get_domain(self):
378        return CDPElement.get_data(self)
379
380
381class Duplex(CDPElement):
382    Type = 0xb
383
384    def get_type(self):
385        return Duplex.Type
386
387    def get_duplex(self):
388        return CDPElement.get_data(self)
389
390    def is_full_duplex(self):
391        return self.get_duplex()==0x1
392
393class VLAN(CDPElement):
394    Type = 0xa
395
396    def get_type(self):
397        return VLAN.Type
398
399    def get_vlan_number(self):
400        return CDPElement.get_data(self)
401
402
403
404class TrustBitmap(CDPElement):
405    Type = 0x12
406
407    def get_type(self):
408        return TrustBitmap.Type
409
410    def get_trust_bitmap(self):
411        return self.get_data()
412
413    def __str__(self):
414        return "TrustBitmap Trust Bitmap:%r" % self.get_trust_bitmap()
415
416class UntrustedPortCoS(CDPElement):
417    Type = 0x13
418
419    def get_type(self):
420        return UntrustedPortCoS.Type
421
422    def get_port_CoS(self):
423        return self.get_data()
424
425    def __str__(self):
426        return "UntrustedPortCoS port CoS %r" % self.get_port_CoS()
427
428class ManagementAddresses(Address):
429    Type = 0x16
430
431    def get_type(self):
432        return ManagementAddresses.Type
433
434class MTU(CDPElement):
435    Type = 0x11
436
437    def get_type(self):
438        return MTU.Type
439
440class SystemName(CDPElement):
441    Type = 0x14
442
443    def get_type(self):
444        return SystemName.Type
445
446class SystemObjectId(CDPElement):
447    Type = 0x15
448
449    def get_type(self):
450        return SystemObjectId.Type
451
452class SnmpLocation(CDPElement):
453    Type = 0x17
454
455    def get_type(self):
456        return SnmpLocation.Type
457
458
459class DummyCdpElement(CDPElement):
460    Type = 0x99
461
462    def get_type(self):
463        return DummyCdpElement.Type
464
465class CDPElementFactory():
466
467    elementTypeMap = {
468                        CDPDevice.Type            : CDPDevice,
469                        Port.Type                 : Port,
470                        Capabilities.Type         : Capabilities,
471                        Address.Type              : Address,
472                        SoftVersion.Type          : SoftVersion,
473                        Platform.Type             : Platform,
474                        IpPrefix.Type             : IpPrefix,
475                        ProtocolHello.Type        : ProtocolHello,
476                        VTPManagementDomain.Type  : VTPManagementDomain,
477                        VLAN.Type                 : VLAN,
478                        Duplex.Type               : Duplex,
479                        TrustBitmap.Type          : TrustBitmap,
480                        UntrustedPortCoS.Type     : UntrustedPortCoS,
481                        ManagementAddresses.Type  : ManagementAddresses,
482                        MTU.Type                  : MTU,
483                        SystemName.Type           : SystemName,
484                        SystemObjectId.Type       : SystemObjectId,
485                        SnmpLocation.Type         : SnmpLocation
486                     }
487
488    @classmethod
489    def create(cls, aBuffer):
490#        print "CDPElementFactory.create aBuffer:", repr(aBuffer)
491#        print "CDPElementFactory.create sub_type:", repr(aBuffer[0:2])
492        _type = unpack("!h", aBuffer[0:2])[0]
493#        print "CDPElementFactory.create _type:", _type
494        try:
495            class_type = cls.elementTypeMap[_type]
496        except KeyError:
497            class_type = DummyCdpElement
498            #raise Exception("CDP Element type %s not implemented" % _type)
499        return class_type( aBuffer )
500