1# -*- test-case-name: twisted.protocols.haproxy.test.test_v2parser -*-
2
3# Copyright (c) Twisted Matrix Laboratories.
4# See LICENSE for details.
5
6"""
7IProxyParser implementation for version two of the PROXY protocol.
8"""
9
10import binascii
11import struct
12from typing import Callable, Tuple, Type, Union
13
14from zope.interface import implementer
15
16from constantly import ValueConstant, Values  # type: ignore[import]
17from typing_extensions import Literal
18
19from twisted.internet import address
20from twisted.python import compat
21from . import _info, _interfaces
22from ._exceptions import (
23    InvalidNetworkProtocol,
24    InvalidProxyHeader,
25    MissingAddressData,
26    convertError,
27)
28
29
30class NetFamily(Values):
31    """
32    Values for the 'family' field.
33    """
34
35    UNSPEC = ValueConstant(0x00)
36    INET = ValueConstant(0x10)
37    INET6 = ValueConstant(0x20)
38    UNIX = ValueConstant(0x30)
39
40
41class NetProtocol(Values):
42    """
43    Values for 'protocol' field.
44    """
45
46    UNSPEC = ValueConstant(0)
47    STREAM = ValueConstant(1)
48    DGRAM = ValueConstant(2)
49
50
51_HIGH = 0b11110000
52_LOW = 0b00001111
53_LOCALCOMMAND = "LOCAL"
54_PROXYCOMMAND = "PROXY"
55
56
57@implementer(_interfaces.IProxyParser)
58class V2Parser:
59    """
60    PROXY protocol version two header parser.
61
62    Version two of the PROXY protocol is a binary format.
63    """
64
65    PREFIX = b"\x0D\x0A\x0D\x0A\x00\x0D\x0A\x51\x55\x49\x54\x0A"
66    VERSIONS = [32]
67    COMMANDS = {0: _LOCALCOMMAND, 1: _PROXYCOMMAND}
68    ADDRESSFORMATS = {
69        # TCP4
70        17: "!4s4s2H",
71        18: "!4s4s2H",
72        # TCP6
73        33: "!16s16s2H",
74        34: "!16s16s2H",
75        # UNIX
76        49: "!108s108s",
77        50: "!108s108s",
78    }
79
80    def __init__(self) -> None:
81        self.buffer = b""
82
83    def feed(
84        self, data: bytes
85    ) -> Union[Tuple[_info.ProxyInfo, bytes], Tuple[None, None]]:
86        """
87        Consume a chunk of data and attempt to parse it.
88
89        @param data: A bytestring.
90        @type data: bytes
91
92        @return: A two-tuple containing, in order, a L{_interfaces.IProxyInfo}
93            and any bytes fed to the parser that followed the end of the
94            header.  Both of these values are None until a complete header is
95            parsed.
96
97        @raises InvalidProxyHeader: If the bytes fed to the parser create an
98            invalid PROXY header.
99        """
100        self.buffer += data
101        if len(self.buffer) < 16:
102            raise InvalidProxyHeader()
103
104        size = struct.unpack("!H", self.buffer[14:16])[0] + 16
105        if len(self.buffer) < size:
106            return (None, None)
107
108        header, remaining = self.buffer[:size], self.buffer[size:]
109        self.buffer = b""
110        info = self.parse(header)
111        return (info, remaining)
112
113    @staticmethod
114    def _bytesToIPv4(bytestring: bytes) -> bytes:
115        """
116        Convert packed 32-bit IPv4 address bytes into a dotted-quad ASCII bytes
117        representation of that address.
118
119        @param bytestring: 4 octets representing an IPv4 address.
120        @type bytestring: L{bytes}
121
122        @return: a dotted-quad notation IPv4 address.
123        @rtype: L{bytes}
124        """
125        return b".".join(
126            ("%i" % (ord(b),)).encode("ascii") for b in compat.iterbytes(bytestring)
127        )
128
129    @staticmethod
130    def _bytesToIPv6(bytestring: bytes) -> bytes:
131        """
132        Convert packed 128-bit IPv6 address bytes into a colon-separated ASCII
133        bytes representation of that address.
134
135        @param bytestring: 16 octets representing an IPv6 address.
136        @type bytestring: L{bytes}
137
138        @return: a dotted-quad notation IPv6 address.
139        @rtype: L{bytes}
140        """
141        hexString = binascii.b2a_hex(bytestring)
142        return b":".join(
143            (f"{int(hexString[b : b + 4], 16):x}").encode("ascii")
144            for b in range(0, 32, 4)
145        )
146
147    @classmethod
148    def parse(cls, line: bytes) -> _info.ProxyInfo:
149        """
150        Parse a bytestring as a full PROXY protocol header.
151
152        @param line: A bytestring that represents a valid HAProxy PROXY
153            protocol version 2 header.
154        @type line: bytes
155
156        @return: A L{_interfaces.IProxyInfo} containing the
157            parsed data.
158
159        @raises InvalidProxyHeader: If the bytestring does not represent a
160            valid PROXY header.
161        """
162        prefix = line[:12]
163        addrInfo = None
164        with convertError(IndexError, InvalidProxyHeader):
165            # Use single value slices to ensure bytestring values are returned
166            # instead of int in PY3.
167            versionCommand = ord(line[12:13])
168            familyProto = ord(line[13:14])
169
170        if prefix != cls.PREFIX:
171            raise InvalidProxyHeader()
172
173        version, command = versionCommand & _HIGH, versionCommand & _LOW
174        if version not in cls.VERSIONS or command not in cls.COMMANDS:
175            raise InvalidProxyHeader()
176
177        if cls.COMMANDS[command] == _LOCALCOMMAND:
178            return _info.ProxyInfo(line, None, None)
179
180        family, netproto = familyProto & _HIGH, familyProto & _LOW
181        with convertError(ValueError, InvalidNetworkProtocol):
182            family = NetFamily.lookupByValue(family)
183            netproto = NetProtocol.lookupByValue(netproto)
184        if family is NetFamily.UNSPEC or netproto is NetProtocol.UNSPEC:
185            return _info.ProxyInfo(line, None, None)
186
187        addressFormat = cls.ADDRESSFORMATS[familyProto]
188        addrInfo = line[16 : 16 + struct.calcsize(addressFormat)]
189        if family is NetFamily.UNIX:
190            with convertError(struct.error, MissingAddressData):
191                source, dest = struct.unpack(addressFormat, addrInfo)
192            return _info.ProxyInfo(
193                line,
194                address.UNIXAddress(source.rstrip(b"\x00")),
195                address.UNIXAddress(dest.rstrip(b"\x00")),
196            )
197
198        addrType: Union[Literal["TCP"], Literal["UDP"]] = "TCP"
199        if netproto is NetProtocol.DGRAM:
200            addrType = "UDP"
201        addrCls: Union[
202            Type[address.IPv4Address], Type[address.IPv6Address]
203        ] = address.IPv4Address
204        addrParser: Callable[[bytes], bytes] = cls._bytesToIPv4
205        if family is NetFamily.INET6:
206            addrCls = address.IPv6Address
207            addrParser = cls._bytesToIPv6
208
209        with convertError(struct.error, MissingAddressData):
210            info = struct.unpack(addressFormat, addrInfo)
211            source, dest, sPort, dPort = info
212
213        return _info.ProxyInfo(
214            line,
215            addrCls(addrType, addrParser(source).decode(), sPort),
216            addrCls(addrType, addrParser(dest).decode(), dPort),
217        )
218