1# $Id: gre.py 75 2010-08-03 14:42:19Z jon.oberheide $
2# -*- coding: utf-8 -*-
3"""Generic Routing Encapsulation."""
4from __future__ import absolute_import
5
6import struct
7import codecs
8
9from . import dpkt
10from . import ethernet
11from .compat import compat_izip
12
13GRE_CP = 0x8000  # Checksum Present
14GRE_RP = 0x4000  # Routing Present
15GRE_KP = 0x2000  # Key Present
16GRE_SP = 0x1000  # Sequence Present
17GRE_SS = 0x0800  # Strict Source Route
18GRE_AP = 0x0080  # Acknowledgment Present
19
20GRE_opt_fields = (
21    (GRE_CP | GRE_RP, 'sum', 'H'), (GRE_CP | GRE_RP, 'off', 'H'),
22    (GRE_KP, 'key', 'I'), (GRE_SP, 'seq', 'I'), (GRE_AP, 'ack', 'I')
23)
24
25
26class GRE(dpkt.Packet):
27    """Generic Routing Encapsulation.
28
29    TODO: Longer class information....
30
31    Attributes:
32        __hdr__: Header fields of GRE.
33        TODO.
34    """
35
36    __hdr__ = (
37        ('flags', 'H', 0),
38        ('p', 'H', 0x0800),  # ETH_TYPE_IP
39    )
40    sre = ()
41
42    @property
43    def v(self):
44        return self.flags & 0x7
45
46    @v.setter
47    def v(self, v):
48        self.flags = (self.flags & ~0x7) | (v & 0x7)
49
50    @property
51    def recur(self):
52        return (self.flags >> 5) & 0x7
53
54    @recur.setter
55    def recur(self, v):
56        self.flags = (self.flags & ~0xe0) | ((v & 0x7) << 5)
57
58    class SRE(dpkt.Packet):
59        __hdr__ = [
60            ('family', 'H', 0),
61            ('off', 'B', 0),
62            ('len', 'B', 0)
63        ]
64
65        def unpack(self, buf):
66            dpkt.Packet.unpack(self, buf)
67            self.data = self.data[:self.len]
68
69    def opt_fields_fmts(self):
70        if self.v == 0:
71            fields, fmts = [], []
72            opt_fields = GRE_opt_fields
73        else:
74            fields, fmts = ['len', 'callid'], ['H', 'H']
75            opt_fields = GRE_opt_fields[-2:]
76        for flags, field, fmt in opt_fields:
77            if self.flags & flags:
78                fields.append(field)
79                fmts.append(fmt)
80        return fields, fmts
81
82    def unpack(self, buf):
83        dpkt.Packet.unpack(self, buf)
84        fields, fmts = self.opt_fields_fmts()
85        if fields:
86            fmt = ''.join(fmts)
87            fmtlen = struct.calcsize(fmt)
88            vals = struct.unpack("!" + fmt, self.data[:fmtlen])
89            self.data = self.data[fmtlen:]
90            self.__dict__.update(dict(compat_izip(fields, vals)))
91        if self.flags & GRE_RP:
92            l_ = []
93            while True:
94                sre = self.SRE(self.data)
95                self.data = self.data[len(sre):]
96                l_.append(sre)
97                if not sre.len:
98                    break
99            self.sre = l_
100        try:
101            self.data = ethernet.Ethernet._typesw[self.p](self.data)
102            setattr(self, self.data.__class__.__name__.lower(), self.data)
103        except (KeyError, dpkt.UnpackError):
104            # data alrady set
105            pass
106
107    def __len__(self):
108        opt_fmtlen = struct.calcsize(''.join(self.opt_fields_fmts()[1]))
109        return self.__hdr_len__ + opt_fmtlen + sum(map(len, self.sre)) + len(self.data)
110
111    def __bytes__(self):
112        fields, fmts = self.opt_fields_fmts()
113        if fields:
114            vals = []
115            for f in fields:
116                vals.append(getattr(self, f))
117            opt_s = struct.pack('!' + ''.join(fmts), *vals)
118        else:
119            opt_s = b''
120        return self.pack_hdr() + opt_s + b''.join(map(bytes, self.sre)) + bytes(self.data)
121
122
123def test_gre_v1():
124    # Runs all the test associated with this class/file
125    s = codecs.decode("3081880a0067178000068fb100083a76", 'hex') + b"A" * 103
126    g = GRE(s)
127
128    assert g.v == 1
129    assert g.p == 0x880a
130    assert g.seq == 430001
131    assert g.ack == 539254
132    assert g.callid == 6016
133    assert g.len == 103
134    assert g.data == b"A" * 103
135    assert len(g) == len(s)
136
137    s = codecs.decode("3001880a00b2001100083ab8", 'hex') + b"A" * 178
138    g = GRE(s)
139
140    assert g.v == 1
141    assert g.p == 0x880a
142    assert g.seq == 539320
143    assert g.callid == 17
144    assert g.len == 178
145    assert g.data == b"A" * 178
146    assert len(g) == len(s)
147
148
149def test_gre_len():
150    from binascii import unhexlify
151
152    gre = GRE()
153    assert len(gre) == 4
154
155    buf = unhexlify("3081880a0067178000068fb100083a76") + b"\x41" * 103
156    gre = GRE(buf)
157    assert bytes(gre) == buf
158    assert len(gre) == len(buf)
159
160
161def test_gre_accessors():
162    gre = GRE()
163    for attr in ['v', 'recur']:
164        print(attr)
165        assert hasattr(gre, attr)
166        assert getattr(gre, attr) == 0
167        setattr(gre, attr, 1)
168        assert getattr(gre, attr) == 1
169
170
171def test_sre_creation():
172    from binascii import unhexlify
173    buf = unhexlify(
174        '0000'  # family
175        '00'    # off
176        '02'    # len
177
178        'ffff'
179    )
180    sre = GRE.SRE(buf)
181    assert sre.data == b'\xff\xff'
182    assert len(sre) == 6
183    assert bytes(sre) == buf
184
185
186def test_gre_nested_sre():
187    from binascii import unhexlify
188    buf = unhexlify(
189        '4000'  # flags (GRE_RP)
190        '0800'  # p (ETH_TYPE_IP)
191
192        '0001'  # sum
193        '0002'  # off
194
195        # SRE entry
196        '0003'  # family
197        '04'    # off
198        '02'    # len
199
200        'ffff'
201
202        # SRE entry (no len => last element)
203        '0006'  # family
204        '00'    # off
205        '00'    # len
206    )
207
208    gre = GRE(buf)
209    assert hasattr(gre, 'sre')
210    assert isinstance(gre.sre, list)
211    assert len(gre.sre) == 2
212    assert len(gre) == len(buf)
213    assert bytes(gre) == buf
214    assert gre.data == b''
215
216
217def test_gre_next_layer():
218    from binascii import unhexlify
219
220    from . import ipx
221
222    buf = unhexlify(
223        '0000'  # flags (NONE)
224        '8137'  # p (ETH_TYPE_IPX)
225
226        # IPX packet
227        '0000'          # sum
228        '0001'          # len
229        '02'            # tc
230        '03'            # pt
231        '0102030405060708090a0b0c'  # dst
232        'c0b0a0908070605040302010'  # src
233    )
234    gre = GRE(buf)
235    assert hasattr(gre, 'ipx')
236    assert isinstance(gre.data, ipx.IPX)
237    assert gre.data.tc == 2
238    assert gre.data.src == unhexlify('c0b0a0908070605040302010')
239    assert gre.data.dst == unhexlify('0102030405060708090a0b0c')
240    assert len(gre) == len(buf)
241    assert bytes(gre) == buf
242