1import hashlib 2import ipaddress 3from typing import Optional 4 5 6def candidate_foundation( 7 candidate_type: str, candidate_transport: str, base_address: str 8) -> str: 9 """ 10 See RFC 5245 - 4.1.1.3. Computing Foundations 11 """ 12 key = "%s|%s|%s" % (candidate_type, candidate_transport, base_address) 13 return hashlib.md5(key.encode("ascii")).hexdigest() 14 15 16def candidate_priority( 17 candidate_component: int, candidate_type: str, local_pref: int = 65535 18) -> int: 19 """ 20 See RFC 5245 - 4.1.2.1. Recommended Formula 21 """ 22 if candidate_type == "host": 23 type_pref = 126 24 elif candidate_type == "prflx": 25 type_pref = 110 26 elif candidate_type == "srflx": 27 type_pref = 100 28 else: 29 type_pref = 0 30 31 return (1 << 24) * type_pref + (1 << 8) * local_pref + (256 - candidate_component) 32 33 34class Candidate: 35 """ 36 An ICE candidate. 37 """ 38 39 def __init__( 40 self, 41 foundation: str, 42 component: int, 43 transport: str, 44 priority: int, 45 host: str, 46 port: int, 47 type: str, 48 related_address: Optional[str] = None, 49 related_port: Optional[int] = None, 50 tcptype: Optional[str] = None, 51 generation: Optional[int] = None, 52 ) -> None: 53 self.foundation = foundation 54 self.component = component 55 self.transport = transport 56 self.priority = priority 57 self.host = host 58 self.port = port 59 self.type = type 60 self.related_address = related_address 61 self.related_port = related_port 62 self.tcptype = tcptype 63 self.generation = generation 64 65 @classmethod 66 def from_sdp(cls, sdp): 67 """ 68 Parse a :class:`Candidate` from SDP. 69 70 .. code-block:: python 71 72 Candidate.from_sdp( 73 '6815297761 1 udp 659136 1.2.3.4 31102 typ host generation 0') 74 """ 75 bits = sdp.split() 76 if len(bits) < 8: 77 raise ValueError("SDP does not have enough properties") 78 79 kwargs = { 80 "foundation": bits[0], 81 "component": int(bits[1]), 82 "transport": bits[2], 83 "priority": int(bits[3]), 84 "host": bits[4], 85 "port": int(bits[5]), 86 "type": bits[7], 87 } 88 89 for i in range(8, len(bits) - 1, 2): 90 if bits[i] == "raddr": 91 kwargs["related_address"] = bits[i + 1] 92 elif bits[i] == "rport": 93 kwargs["related_port"] = int(bits[i + 1]) 94 elif bits[i] == "tcptype": 95 kwargs["tcptype"] = bits[i + 1] 96 elif bits[i] == "generation": 97 kwargs["generation"] = int(bits[i + 1]) 98 99 return Candidate(**kwargs) 100 101 def to_sdp(self) -> str: 102 """ 103 Return a string representation suitable for SDP. 104 """ 105 sdp = "%s %d %s %d %s %d typ %s" % ( 106 self.foundation, 107 self.component, 108 self.transport, 109 self.priority, 110 self.host, 111 self.port, 112 self.type, 113 ) 114 if self.related_address is not None: 115 sdp += " raddr %s" % self.related_address 116 if self.related_port is not None: 117 sdp += " rport %s" % self.related_port 118 if self.tcptype is not None: 119 sdp += " tcptype %s" % self.tcptype 120 if self.generation is not None: 121 sdp += " generation %d" % self.generation 122 return sdp 123 124 def can_pair_with(self, other) -> bool: 125 """ 126 A local candidate is paired with a remote candidate if and only if 127 the two candidates have the same component ID and have the same IP 128 address version. 129 """ 130 a = ipaddress.ip_address(self.host) 131 b = ipaddress.ip_address(other.host) 132 return ( 133 self.component == other.component 134 and self.transport.lower() == other.transport.lower() 135 and a.version == b.version 136 ) 137 138 def __repr__(self) -> str: 139 return "Candidate(%s)" % self.to_sdp() 140