1from typing import Optional 2 3from ddtrace.internal.compat import parse 4from ddtrace.vendor.dogstatsd import DogStatsd 5from ddtrace.vendor.dogstatsd import base 6 7 8def get_dogstatsd_client(url): 9 # type: (str) -> Optional[DogStatsd] 10 if not url: 11 return None 12 13 # url can be either of the form `udp://<host>:<port>` or `unix://<path>` 14 # also support without url scheme included 15 if url.startswith("/"): 16 url = "unix://" + url 17 elif "://" not in url: 18 url = "udp://" + url 19 20 parsed = parse.urlparse(url) 21 22 if parsed.scheme == "unix": 23 return DogStatsd(socket_path=parsed.path) 24 elif parsed.scheme == "udp": 25 return DogStatsd(host=parsed.hostname, port=base.DEFAULT_PORT if parsed.port is None else parsed.port) 26 27 raise ValueError("Unknown scheme `%s` for DogStatsD URL `{}`".format(parsed.scheme)) 28