1import hashlib
2import ipaddress
3from typing import Optional
4
5
6def candidate_foundation(
7    candidate_type: str, candidate_transport: str, base_address: str
8) -> str:
9    """
10    See RFC 5245 - 4.1.1.3. Computing Foundations
11    """
12    key = "%s|%s|%s" % (candidate_type, candidate_transport, base_address)
13    return hashlib.md5(key.encode("ascii")).hexdigest()
14
15
16def candidate_priority(
17    candidate_component: int, candidate_type: str, local_pref: int = 65535
18) -> int:
19    """
20    See RFC 5245 - 4.1.2.1. Recommended Formula
21    """
22    if candidate_type == "host":
23        type_pref = 126
24    elif candidate_type == "prflx":
25        type_pref = 110
26    elif candidate_type == "srflx":
27        type_pref = 100
28    else:
29        type_pref = 0
30
31    return (1 << 24) * type_pref + (1 << 8) * local_pref + (256 - candidate_component)
32
33
34class Candidate:
35    """
36    An ICE candidate.
37    """
38
39    def __init__(
40        self,
41        foundation: str,
42        component: int,
43        transport: str,
44        priority: int,
45        host: str,
46        port: int,
47        type: str,
48        related_address: Optional[str] = None,
49        related_port: Optional[int] = None,
50        tcptype: Optional[str] = None,
51        generation: Optional[int] = None,
52    ) -> None:
53        self.foundation = foundation
54        self.component = component
55        self.transport = transport
56        self.priority = priority
57        self.host = host
58        self.port = port
59        self.type = type
60        self.related_address = related_address
61        self.related_port = related_port
62        self.tcptype = tcptype
63        self.generation = generation
64
65    @classmethod
66    def from_sdp(cls, sdp):
67        """
68        Parse a :class:`Candidate` from SDP.
69
70        .. code-block:: python
71
72           Candidate.from_sdp(
73            '6815297761 1 udp 659136 1.2.3.4 31102 typ host generation 0')
74        """
75        bits = sdp.split()
76        if len(bits) < 8:
77            raise ValueError("SDP does not have enough properties")
78
79        kwargs = {
80            "foundation": bits[0],
81            "component": int(bits[1]),
82            "transport": bits[2],
83            "priority": int(bits[3]),
84            "host": bits[4],
85            "port": int(bits[5]),
86            "type": bits[7],
87        }
88
89        for i in range(8, len(bits) - 1, 2):
90            if bits[i] == "raddr":
91                kwargs["related_address"] = bits[i + 1]
92            elif bits[i] == "rport":
93                kwargs["related_port"] = int(bits[i + 1])
94            elif bits[i] == "tcptype":
95                kwargs["tcptype"] = bits[i + 1]
96            elif bits[i] == "generation":
97                kwargs["generation"] = int(bits[i + 1])
98
99        return Candidate(**kwargs)
100
101    def to_sdp(self) -> str:
102        """
103        Return a string representation suitable for SDP.
104        """
105        sdp = "%s %d %s %d %s %d typ %s" % (
106            self.foundation,
107            self.component,
108            self.transport,
109            self.priority,
110            self.host,
111            self.port,
112            self.type,
113        )
114        if self.related_address is not None:
115            sdp += " raddr %s" % self.related_address
116        if self.related_port is not None:
117            sdp += " rport %s" % self.related_port
118        if self.tcptype is not None:
119            sdp += " tcptype %s" % self.tcptype
120        if self.generation is not None:
121            sdp += " generation %d" % self.generation
122        return sdp
123
124    def can_pair_with(self, other) -> bool:
125        """
126        A local candidate is paired with a remote candidate if and only if
127        the two candidates have the same component ID and have the same IP
128        address version.
129        """
130        a = ipaddress.ip_address(self.host)
131        b = ipaddress.ip_address(other.host)
132        return (
133            self.component == other.component
134            and self.transport.lower() == other.transport.lower()
135            and a.version == b.version
136        )
137
138    def __repr__(self) -> str:
139        return "Candidate(%s)" % self.to_sdp()
140