1# vim:fileencoding=utf-8:noet 2from __future__ import (unicode_literals, division, absolute_import, print_function) 3 4import re 5import os 6import socket 7 8from powerline.lib.url import urllib_read 9from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment 10from powerline.lib.monotonic import monotonic 11from powerline.lib.humanize_bytes import humanize_bytes 12from powerline.segments import with_docstring 13from powerline.theme import requires_segment_info 14 15 16@requires_segment_info 17def hostname(pl, segment_info, only_if_ssh=False, exclude_domain=False): 18 '''Return the current hostname. 19 20 :param bool only_if_ssh: 21 only return the hostname if currently in an SSH session 22 :param bool exclude_domain: 23 return the hostname without domain if there is one 24 ''' 25 if ( 26 segment_info['environ'].get('_POWERLINE_RUNNING_SHELL_TESTS') 27 == 'ee5bcdc6-b749-11e7-9456-50465d597777' 28 ): 29 return 'hostname' 30 if only_if_ssh and not segment_info['environ'].get('SSH_CLIENT'): 31 return None 32 if exclude_domain: 33 return socket.gethostname().split('.')[0] 34 return socket.gethostname() 35 36 37def _external_ip(query_url='http://ipv4.icanhazip.com/'): 38 return urllib_read(query_url).strip() 39 40 41class ExternalIpSegment(ThreadedSegment): 42 interval = 300 43 44 def set_state(self, query_url='http://ipv4.icanhazip.com/', **kwargs): 45 self.query_url = query_url 46 super(ExternalIpSegment, self).set_state(**kwargs) 47 48 def update(self, old_ip): 49 return _external_ip(query_url=self.query_url) 50 51 def render(self, ip, **kwargs): 52 if not ip: 53 return None 54 return [{'contents': ip, 'divider_highlight_group': 'background:divider'}] 55 56 57external_ip = with_docstring(ExternalIpSegment(), 58'''Return external IP address. 59 60:param str query_url: 61 URI to query for IP address, should return only the IP address as a text string 62 63 Suggested URIs: 64 65 * http://ipv4.icanhazip.com/ 66 * http://ipv6.icanhazip.com/ 67 * http://icanhazip.com/ (returns IPv6 address if available, else IPv4) 68 69Divider highlight group used: ``background:divider``. 70''') 71 72 73try: 74 import netifaces 75except ImportError: 76 def internal_ip(pl, interface='auto', ipv=4): 77 return None 78else: 79 _interface_starts = { 80 'eth': 10, # Regular ethernet adapters : eth1 81 'enp': 10, # Regular ethernet adapters, Gentoo : enp2s0 82 'en': 10, # OS X : en0 83 'ath': 9, # Atheros WiFi adapters : ath0 84 'wlan': 9, # Other WiFi adapters : wlan1 85 'wlp': 9, # Other WiFi adapters, Gentoo : wlp5s0 86 'teredo': 1, # miredo interface : teredo 87 'lo': -10, # Loopback interface : lo 88 'docker': -5, # Docker bridge interface : docker0 89 'vmnet': -5, # VMWare bridge interface : vmnet1 90 'vboxnet': -5, # VirtualBox bridge interface : vboxnet0 91 } 92 93 _interface_start_re = re.compile(r'^([a-z]+?)(\d|$)') 94 95 def _interface_key(interface): 96 match = _interface_start_re.match(interface) 97 if match: 98 try: 99 base = _interface_starts[match.group(1)] * 100 100 except KeyError: 101 base = 500 102 if match.group(2): 103 return base - int(match.group(2)) 104 else: 105 return base 106 else: 107 return 0 108 109 def internal_ip(pl, interface='auto', ipv=4): 110 family = netifaces.AF_INET6 if ipv == 6 else netifaces.AF_INET 111 if interface == 'auto': 112 try: 113 interface = next(iter(sorted(netifaces.interfaces(), key=_interface_key, reverse=True))) 114 except StopIteration: 115 pl.info('No network interfaces found') 116 return None 117 elif interface == 'default_gateway': 118 try: 119 interface = netifaces.gateways()['default'][family][1] 120 except KeyError: 121 pl.info('No default gateway found for IPv{0}', ipv) 122 return None 123 addrs = netifaces.ifaddresses(interface) 124 try: 125 return addrs[family][0]['addr'] 126 except (KeyError, IndexError): 127 pl.info("No IPv{0} address found for interface {1}", ipv, interface) 128 return None 129 130 131internal_ip = with_docstring(internal_ip, 132'''Return internal IP address 133 134Requires ``netifaces`` module to work properly. 135 136:param str interface: 137 Interface on which IP will be checked. Use ``auto`` to automatically 138 detect interface. In this case interfaces with lower numbers will be 139 preferred over interfaces with similar names. Order of preference based on 140 names: 141 142 #. ``eth`` and ``enp`` followed by number or the end of string. 143 #. ``ath``, ``wlan`` and ``wlp`` followed by number or the end of string. 144 #. ``teredo`` followed by number or the end of string. 145 #. Any other interface that is not ``lo*``. 146 #. ``lo`` followed by number or the end of string. 147 148 Use ``default_gateway`` to detect the interface based on the machine's 149 `default gateway <https://en.wikipedia.org/wiki/Default_gateway>`_ (i.e., 150 the router to which it is connected). 151 152:param int ipv: 153 4 or 6 for ipv4 and ipv6 respectively, depending on which IP address you 154 need exactly. 155''') 156 157 158try: 159 import psutil 160 161 def _get_bytes(interface): 162 try: 163 io_counters = psutil.net_io_counters(pernic=True) 164 except AttributeError: 165 io_counters = psutil.network_io_counters(pernic=True) 166 if_io = io_counters.get(interface) 167 if not if_io: 168 return None 169 return if_io.bytes_recv, if_io.bytes_sent 170 171 def _get_interfaces(): 172 try: 173 io_counters = psutil.net_io_counters(pernic=True) 174 except AttributeError: 175 io_counters = psutil.network_io_counters(pernic=True) 176 for interface, data in io_counters.items(): 177 if data: 178 yield interface, data.bytes_recv, data.bytes_sent 179except ImportError: 180 def _get_bytes(interface): 181 with open('/sys/class/net/{interface}/statistics/rx_bytes'.format(interface=interface), 'rb') as file_obj: 182 rx = int(file_obj.read()) 183 with open('/sys/class/net/{interface}/statistics/tx_bytes'.format(interface=interface), 'rb') as file_obj: 184 tx = int(file_obj.read()) 185 return (rx, tx) 186 187 def _get_interfaces(): 188 for interface in os.listdir('/sys/class/net'): 189 x = _get_bytes(interface) 190 if x is not None: 191 yield interface, x[0], x[1] 192 193 194class NetworkLoadSegment(KwThreadedSegment): 195 interfaces = {} 196 replace_num_pat = re.compile(r'[a-zA-Z]+') 197 198 @staticmethod 199 def key(interface='auto', **kwargs): 200 return interface 201 202 def compute_state(self, interface): 203 if interface == 'auto': 204 proc_exists = getattr(self, 'proc_exists', None) 205 if proc_exists is None: 206 proc_exists = self.proc_exists = os.path.exists('/proc/net/route') 207 if proc_exists: 208 # Look for default interface in routing table 209 with open('/proc/net/route', 'rb') as f: 210 for line in f.readlines(): 211 parts = line.split() 212 if len(parts) > 1: 213 iface, destination = parts[:2] 214 if not destination.replace(b'0', b''): 215 interface = iface.decode('utf-8') 216 break 217 if interface == 'auto': 218 # Choose interface with most total activity, excluding some 219 # well known interface names 220 interface, total = 'eth0', -1 221 for name, rx, tx in _get_interfaces(): 222 base = self.replace_num_pat.match(name) 223 if None in (base, rx, tx) or base.group() in ('lo', 'vmnet', 'sit'): 224 continue 225 activity = rx + tx 226 if activity > total: 227 total = activity 228 interface = name 229 230 try: 231 idata = self.interfaces[interface] 232 try: 233 idata['prev'] = idata['last'] 234 except KeyError: 235 pass 236 except KeyError: 237 idata = {} 238 if self.run_once: 239 idata['prev'] = (monotonic(), _get_bytes(interface)) 240 self.shutdown_event.wait(self.interval) 241 self.interfaces[interface] = idata 242 243 idata['last'] = (monotonic(), _get_bytes(interface)) 244 return idata.copy() 245 246 def render_one(self, idata, recv_format='DL {value:>8}', sent_format='UL {value:>8}', suffix='B/s', si_prefix=False, **kwargs): 247 if not idata or 'prev' not in idata: 248 return None 249 250 t1, b1 = idata['prev'] 251 t2, b2 = idata['last'] 252 measure_interval = t2 - t1 253 254 if None in (b1, b2): 255 return None 256 257 r = [] 258 for i, key in zip((0, 1), ('recv', 'sent')): 259 format = locals()[key + '_format'] 260 try: 261 value = (b2[i] - b1[i]) / measure_interval 262 except ZeroDivisionError: 263 self.warn('Measure interval zero.') 264 value = 0 265 max_key = key + '_max' 266 is_gradient = max_key in kwargs 267 hl_groups = ['network_load_' + key, 'network_load'] 268 if is_gradient: 269 hl_groups[:0] = (group + '_gradient' for group in hl_groups) 270 r.append({ 271 'contents': format.format(value=humanize_bytes(value, suffix, si_prefix)), 272 'divider_highlight_group': 'network_load:divider', 273 'highlight_groups': hl_groups, 274 }) 275 if is_gradient: 276 max = kwargs[max_key] 277 if value >= max: 278 r[-1]['gradient_level'] = 100 279 else: 280 r[-1]['gradient_level'] = value * 100.0 / max 281 282 return r 283 284 285network_load = with_docstring(NetworkLoadSegment(), 286'''Return the network load. 287 288Uses the ``psutil`` module if available for multi-platform compatibility, 289falls back to reading 290:file:`/sys/class/net/{interface}/statistics/{rx,tx}_bytes`. 291 292:param str interface: 293 Network interface to measure (use the special value "auto" to have powerline 294 try to auto-detect the network interface). 295:param str suffix: 296 String appended to each load string. 297:param bool si_prefix: 298 Use SI prefix, e.g. MB instead of MiB. 299:param str recv_format: 300 Format string that determines how download speed should look like. Receives 301 ``value`` as argument. 302:param str sent_format: 303 Format string that determines how upload speed should look like. Receives 304 ``value`` as argument. 305:param float recv_max: 306 Maximum number of received bytes per second. Is only used to compute 307 gradient level. 308:param float sent_max: 309 Maximum number of sent bytes per second. Is only used to compute gradient 310 level. 311 312Divider highlight group used: ``network_load:divider``. 313 314Highlight groups used: ``network_load_sent_gradient`` (gradient) or ``network_load_recv_gradient`` (gradient) or ``network_load_gradient`` (gradient), ``network_load_sent`` or ``network_load_recv`` or ``network_load``. 315''') 316