1# encoding: utf-8 2from __future__ import unicode_literals 3 4from collections import namedtuple 5import contextlib 6import functools 7import socket 8import threading 9from twitter.util import find_links, follow_redirects, expand_line, parse_host_list 10 11try: 12 import http.server as BaseHTTPServer 13 import socketserver as SocketServer 14except ImportError: 15 import BaseHTTPServer 16 import SocketServer 17 18 19def test_find_links(): 20 assert find_links("nix") == ("nix", []) 21 assert find_links("http://abc") == ("%s", ["http://abc"]) 22 assert find_links("t http://abc") == ("t %s", ["http://abc"]) 23 assert find_links("http://abc t") == ("%s t", ["http://abc"]) 24 assert find_links("1 http://a 2 http://b 3") == ("1 %s 2 %s 3", 25 ["http://a", "http://b"]) 26 assert find_links("%") == ("%%", []) 27 assert find_links("(http://abc)") == ("(%s)", ["http://abc"]) 28 29 30Response = namedtuple('Response', 'path code headers') 31 32@contextlib.contextmanager 33def start_server(*resp): 34 """HTTP server replying with the given responses to the expected 35 requests.""" 36 def url(port, path): 37 return 'http://%s:%s%s' % (socket.gethostname().lower(), port, path) 38 39 responses = list(reversed(resp)) 40 41 class MyHandler(BaseHTTPServer.BaseHTTPRequestHandler): 42 def do_HEAD(self): 43 response = responses.pop() 44 assert response.path == self.path 45 self.send_response(response.code) 46 for header, value in list(response.headers.items()): 47 self.send_header(header, value) 48 self.end_headers() 49 50 httpd = SocketServer.TCPServer(("", 0), MyHandler) 51 t = threading.Thread(target=httpd.serve_forever) 52 t.setDaemon(True) 53 t.start() 54 port = httpd.server_address[1] 55 yield functools.partial(url, port) 56 httpd.shutdown() 57 58def test_follow_redirects_direct_link(): 59 link = "/resource" 60 with start_server(Response(link, 200, {})) as url: 61 assert url(link) == follow_redirects(url(link)) 62 63def test_follow_redirects_redirected_link(): 64 redirected = "/redirected" 65 link = "/resource" 66 with start_server( 67 Response(link, 301, {"Location": redirected}), 68 Response(redirected, 200, {})) as url: 69 assert url(redirected) == follow_redirects(url(link)) 70 71def test_follow_redirects_unavailable(): 72 link = "/resource" 73 with start_server(Response(link, 404, {})) as url: 74 assert url(link) == follow_redirects(url(link)) 75 76def test_follow_redirects_link_to_last_available(): 77 unavailable = "/unavailable" 78 link = "/resource" 79 with start_server( 80 Response(link, 301, {"Location": unavailable}), 81 Response(unavailable, 404, {})) as url: 82 assert url(unavailable) == follow_redirects(url(link)) 83 84 85def test_follow_redirects_no_where(): 86 link = "http://links.nowhere/" 87 assert link == follow_redirects(link) 88 89def test_follow_redirects_link_to_nowhere(): 90 unavailable = "http://links.nowhere/" 91 link = "/resource" 92 with start_server( 93 Response(link, 301, {"Location": unavailable})) as url: 94 assert unavailable == follow_redirects(url(link)) 95 96def test_follow_redirects_filtered_by_site(): 97 link = "/resource" 98 with start_server() as url: 99 assert url(link) == follow_redirects(url(link), ["other_host"]) 100 101 102def test_follow_redirects_filtered_by_site_after_redirect(): 103 link = "/resource" 104 redirected = "/redirected" 105 filtered = "http://dont-follow/" 106 with start_server( 107 Response(link, 301, {"Location": redirected}), 108 Response(redirected, 301, {"Location": filtered})) as url: 109 hosts = [socket.gethostname().lower()] 110 assert filtered == follow_redirects(url(link), hosts) 111 112def test_follow_redirects_filtered_by_site_allowed(): 113 redirected = "/redirected" 114 link = "/resource" 115 with start_server( 116 Response(link, 301, {"Location": redirected}), 117 Response(redirected, 200, {})) as url: 118 hosts = [socket.gethostname().lower()] 119 assert url(redirected) == follow_redirects(url(link), hosts) 120 121def test_expand_line(): 122 redirected = "/redirected" 123 link = "/resource" 124 with start_server( 125 Response(link, 301, {"Location": redirected}), 126 Response(redirected, 200, {})) as url: 127 fmt = "before %s after" 128 line = fmt % url(link) 129 expected = fmt % url(redirected) 130 assert expected == expand_line(line, None) 131 132def test_parse_host_config(): 133 assert set() == parse_host_list("") 134 assert set("h") == parse_host_list("h") 135 assert set(["1", "2"]) == parse_host_list("1,2") 136 assert set(["1", "2"]) == parse_host_list(" 1 , 2 ") 137 138