1# $Id: gre.py 75 2010-08-03 14:42:19Z jon.oberheide $ 2# -*- coding: utf-8 -*- 3"""Generic Routing Encapsulation.""" 4from __future__ import absolute_import 5 6import struct 7import codecs 8 9from . import dpkt 10from . import ethernet 11from .compat import compat_izip 12 13GRE_CP = 0x8000 # Checksum Present 14GRE_RP = 0x4000 # Routing Present 15GRE_KP = 0x2000 # Key Present 16GRE_SP = 0x1000 # Sequence Present 17GRE_SS = 0x0800 # Strict Source Route 18GRE_AP = 0x0080 # Acknowledgment Present 19 20GRE_opt_fields = ( 21 (GRE_CP | GRE_RP, 'sum', 'H'), (GRE_CP | GRE_RP, 'off', 'H'), 22 (GRE_KP, 'key', 'I'), (GRE_SP, 'seq', 'I'), (GRE_AP, 'ack', 'I') 23) 24 25 26class GRE(dpkt.Packet): 27 """Generic Routing Encapsulation. 28 29 TODO: Longer class information.... 30 31 Attributes: 32 __hdr__: Header fields of GRE. 33 TODO. 34 """ 35 36 __hdr__ = ( 37 ('flags', 'H', 0), 38 ('p', 'H', 0x0800), # ETH_TYPE_IP 39 ) 40 sre = () 41 42 @property 43 def v(self): 44 return self.flags & 0x7 45 46 @v.setter 47 def v(self, v): 48 self.flags = (self.flags & ~0x7) | (v & 0x7) 49 50 @property 51 def recur(self): 52 return (self.flags >> 5) & 0x7 53 54 @recur.setter 55 def recur(self, v): 56 self.flags = (self.flags & ~0xe0) | ((v & 0x7) << 5) 57 58 class SRE(dpkt.Packet): 59 __hdr__ = [ 60 ('family', 'H', 0), 61 ('off', 'B', 0), 62 ('len', 'B', 0) 63 ] 64 65 def unpack(self, buf): 66 dpkt.Packet.unpack(self, buf) 67 self.data = self.data[:self.len] 68 69 def opt_fields_fmts(self): 70 if self.v == 0: 71 fields, fmts = [], [] 72 opt_fields = GRE_opt_fields 73 else: 74 fields, fmts = ['len', 'callid'], ['H', 'H'] 75 opt_fields = GRE_opt_fields[-2:] 76 for flags, field, fmt in opt_fields: 77 if self.flags & flags: 78 fields.append(field) 79 fmts.append(fmt) 80 return fields, fmts 81 82 def unpack(self, buf): 83 dpkt.Packet.unpack(self, buf) 84 fields, fmts = self.opt_fields_fmts() 85 if fields: 86 fmt = ''.join(fmts) 87 fmtlen = struct.calcsize(fmt) 88 vals = struct.unpack("!" + fmt, self.data[:fmtlen]) 89 self.data = self.data[fmtlen:] 90 self.__dict__.update(dict(compat_izip(fields, vals))) 91 if self.flags & GRE_RP: 92 l_ = [] 93 while True: 94 sre = self.SRE(self.data) 95 self.data = self.data[len(sre):] 96 l_.append(sre) 97 if not sre.len: 98 break 99 self.sre = l_ 100 try: 101 self.data = ethernet.Ethernet._typesw[self.p](self.data) 102 setattr(self, self.data.__class__.__name__.lower(), self.data) 103 except (KeyError, dpkt.UnpackError): 104 # data alrady set 105 pass 106 107 def __len__(self): 108 opt_fmtlen = struct.calcsize(''.join(self.opt_fields_fmts()[1])) 109 return self.__hdr_len__ + opt_fmtlen + sum(map(len, self.sre)) + len(self.data) 110 111 def __bytes__(self): 112 fields, fmts = self.opt_fields_fmts() 113 if fields: 114 vals = [] 115 for f in fields: 116 vals.append(getattr(self, f)) 117 opt_s = struct.pack('!' + ''.join(fmts), *vals) 118 else: 119 opt_s = b'' 120 return self.pack_hdr() + opt_s + b''.join(map(bytes, self.sre)) + bytes(self.data) 121 122 123def test_gre_v1(): 124 # Runs all the test associated with this class/file 125 s = codecs.decode("3081880a0067178000068fb100083a76", 'hex') + b"A" * 103 126 g = GRE(s) 127 128 assert g.v == 1 129 assert g.p == 0x880a 130 assert g.seq == 430001 131 assert g.ack == 539254 132 assert g.callid == 6016 133 assert g.len == 103 134 assert g.data == b"A" * 103 135 assert len(g) == len(s) 136 137 s = codecs.decode("3001880a00b2001100083ab8", 'hex') + b"A" * 178 138 g = GRE(s) 139 140 assert g.v == 1 141 assert g.p == 0x880a 142 assert g.seq == 539320 143 assert g.callid == 17 144 assert g.len == 178 145 assert g.data == b"A" * 178 146 assert len(g) == len(s) 147 148 149def test_gre_len(): 150 from binascii import unhexlify 151 152 gre = GRE() 153 assert len(gre) == 4 154 155 buf = unhexlify("3081880a0067178000068fb100083a76") + b"\x41" * 103 156 gre = GRE(buf) 157 assert bytes(gre) == buf 158 assert len(gre) == len(buf) 159 160 161def test_gre_accessors(): 162 gre = GRE() 163 for attr in ['v', 'recur']: 164 print(attr) 165 assert hasattr(gre, attr) 166 assert getattr(gre, attr) == 0 167 setattr(gre, attr, 1) 168 assert getattr(gre, attr) == 1 169 170 171def test_sre_creation(): 172 from binascii import unhexlify 173 buf = unhexlify( 174 '0000' # family 175 '00' # off 176 '02' # len 177 178 'ffff' 179 ) 180 sre = GRE.SRE(buf) 181 assert sre.data == b'\xff\xff' 182 assert len(sre) == 6 183 assert bytes(sre) == buf 184 185 186def test_gre_nested_sre(): 187 from binascii import unhexlify 188 buf = unhexlify( 189 '4000' # flags (GRE_RP) 190 '0800' # p (ETH_TYPE_IP) 191 192 '0001' # sum 193 '0002' # off 194 195 # SRE entry 196 '0003' # family 197 '04' # off 198 '02' # len 199 200 'ffff' 201 202 # SRE entry (no len => last element) 203 '0006' # family 204 '00' # off 205 '00' # len 206 ) 207 208 gre = GRE(buf) 209 assert hasattr(gre, 'sre') 210 assert isinstance(gre.sre, list) 211 assert len(gre.sre) == 2 212 assert len(gre) == len(buf) 213 assert bytes(gre) == buf 214 assert gre.data == b'' 215 216 217def test_gre_next_layer(): 218 from binascii import unhexlify 219 220 from . import ipx 221 222 buf = unhexlify( 223 '0000' # flags (NONE) 224 '8137' # p (ETH_TYPE_IPX) 225 226 # IPX packet 227 '0000' # sum 228 '0001' # len 229 '02' # tc 230 '03' # pt 231 '0102030405060708090a0b0c' # dst 232 'c0b0a0908070605040302010' # src 233 ) 234 gre = GRE(buf) 235 assert hasattr(gre, 'ipx') 236 assert isinstance(gre.data, ipx.IPX) 237 assert gre.data.tc == 2 238 assert gre.data.src == unhexlify('c0b0a0908070605040302010') 239 assert gre.data.dst == unhexlify('0102030405060708090a0b0c') 240 assert len(gre) == len(buf) 241 assert bytes(gre) == buf 242