1# $Id: pppoe.py 23 2006-11-08 15:45:33Z dugsong $
2# -*- coding: utf-8 -*-
3"""PPP-over-Ethernet."""
4from __future__ import absolute_import
5
6import struct
7import codecs
8
9from . import dpkt
10from . import ppp
11
12# RFC 2516 codes
13PPPoE_PADI = 0x09
14PPPoE_PADO = 0x07
15PPPoE_PADR = 0x19
16PPPoE_PADS = 0x65
17PPPoE_PADT = 0xA7
18PPPoE_SESSION = 0x00
19
20
21class PPPoE(dpkt.Packet):
22    """PPP-over-Ethernet.
23
24    TODO: Longer class information....
25
26    Attributes:
27        __hdr__: Header fields of PPPoE.
28        TODO.
29    """
30
31    __hdr__ = (
32        ('_v_type', 'B', 0x11),
33        ('code', 'B', 0),
34        ('session', 'H', 0),
35        ('len', 'H', 0)  # payload length
36    )
37
38    @property
39    def v(self):
40        return self._v_type >> 4
41
42    @v.setter
43    def v(self, v):
44        self._v_type = (v << 4) | (self._v_type & 0xf)
45
46    @property
47    def type(self):
48        return self._v_type & 0xf
49
50    @type.setter
51    def type(self, t):
52        self._v_type = (self._v_type & 0xf0) | t
53
54    def unpack(self, buf):
55        dpkt.Packet.unpack(self, buf)
56        try:
57            if self.code == 0:
58                # We need to use the pppoe.PPP header here, because PPPoE
59                # doesn't do the normal encapsulation.
60                self.data = self.ppp = PPP(self.data)
61        except dpkt.UnpackError:
62            pass
63
64
65class PPP(ppp.PPP):
66    # Light version for protocols without the usual encapsulation, for PPPoE
67    __hdr__ = (
68        # Usuaully two-bytes, but while protocol compression is not recommended, it is supported
69        ('p', 'B', ppp.PPP_IP),
70    )
71
72    def unpack(self, buf):
73        dpkt.Packet.unpack(self, buf)
74        if self.p & ppp.PFC_BIT == 0:
75            try:
76                self.p = struct.unpack('>H', buf[:2])[0]
77            except struct.error:
78                raise dpkt.NeedData
79            self.data = self.data[1:]
80        try:
81            self.data = self._protosw[self.p](self.data)
82            setattr(self, self.data.__class__.__name__.lower(), self.data)
83        except (KeyError, struct.error, dpkt.UnpackError):
84            pass
85
86    def pack_hdr(self):
87        try:
88            # Protocol compression is *not* recommended (RFC2516), but we do it anyway
89            if self.p > 0xff:
90                return struct.pack('>H', self.p)
91            return dpkt.Packet.pack_hdr(self)
92        except struct.error as e:
93            raise dpkt.PackError(str(e))
94
95
96def test_pppoe_discovery():
97    s = ("11070000002801010000010300046413"
98         "85180102000442524153010400103d0f"
99         "0587062484f2df32b9ddfd77bd5b")
100    s = codecs.decode(s, 'hex')
101    p = PPPoE(s)
102
103    assert p.code == PPPoE_PADO
104    assert p.v == 1
105    assert p.type == 1
106
107    s = ("11190000002801010000010300046413"
108         "85180102000442524153010400103d0f"
109         "0587062484f2df32b9ddfd77bd5b")
110    s = codecs.decode(s, 'hex')
111    p = PPPoE(s)
112
113    assert p.code == PPPoE_PADR
114
115    assert p.pack_hdr() == s[:6]
116
117
118def test_pppoe_session():
119    s = "11000011000cc0210101000a050605fcd459"
120    s = codecs.decode(s, 'hex')
121    p = PPPoE(s)
122
123    assert p.code == PPPoE_SESSION
124    assert isinstance(p.ppp, PPP)
125    assert p.data.p == 0xc021   # LCP
126    assert len(p.data.data) == 10
127
128    assert p.data.pack_hdr() == b"\xc0\x21"
129
130    s = ("110000110066005760000000003c3a40fc000000000000000000000000000001"
131         "fc0000000002010000000000000100018100bf291f9700010102030405060708"
132         "090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f202122232425262728"
133         "292a2b2c2d2e2f3031323334")
134    s = codecs.decode(s, 'hex')
135    p = PPPoE(s)
136    assert p.code == PPPoE_SESSION
137    assert isinstance(p.ppp, PPP)
138    assert p.data.p == ppp.PPP_IP6
139    assert p.data.data.p == 58   # ICMPv6
140
141    assert p.ppp.pack_hdr() == b"\x57"
142
143
144def test_ppp_packing():
145    p = PPP()
146    assert p.pack_hdr() == b"\x21"
147
148    p.p = 0xc021   # LCP
149    assert p.pack_hdr() == b"\xc0\x21"
150
151
152def test_ppp_short():
153    import pytest
154    pytest.raises(dpkt.NeedData, PPP, b"\x00")
155
156
157def test_pppoe_properties():
158    pppoe = PPPoE()
159    assert pppoe.v == 1
160    pppoe.v = 7
161    assert pppoe.v == 7
162
163    assert pppoe.type == 1
164    pppoe.type = 5
165    assert pppoe.type == 5
166
167
168def test_pppoe_unpack_error():
169    from binascii import unhexlify
170    buf = unhexlify(
171        "11"    # v/type
172        "00"    # code
173        "0011"  # session
174        "0066"  # len
175
176        "00"    # data
177    )
178    # this initialization swallows the UnpackError raised
179    pppoe = PPPoE(buf)
180    # unparsed data is still available
181    assert pppoe.data == b'\x00'
182
183
184def test_ppp_pack_hdr():
185    import pytest
186    from binascii import unhexlify
187
188    buf = unhexlify(
189        '01'  # protocol, with compression bit set
190
191        'ff'  # incomplete data
192    )
193    ppp = PPP(buf)
194    ppp.p = 1234567
195    with pytest.raises(dpkt.PackError):
196        ppp.pack_hdr()
197
198
199# XXX - TODO TLVs, etc.
200