1# encoding: utf-8
2from __future__ import unicode_literals
3
4from collections import namedtuple
5import contextlib
6import functools
7import socket
8import threading
9from twitter.util import find_links, follow_redirects, expand_line, parse_host_list
10
11try:
12    import http.server as BaseHTTPServer
13    import socketserver as SocketServer
14except ImportError:
15    import BaseHTTPServer
16    import SocketServer
17
18
19def test_find_links():
20    assert find_links("nix") == ("nix", [])
21    assert find_links("http://abc") == ("%s", ["http://abc"])
22    assert find_links("t http://abc") == ("t %s", ["http://abc"])
23    assert find_links("http://abc t") == ("%s t", ["http://abc"])
24    assert find_links("1 http://a 2 http://b 3") == ("1 %s 2 %s 3",
25        ["http://a", "http://b"])
26    assert find_links("%") == ("%%", [])
27    assert find_links("(http://abc)") == ("(%s)", ["http://abc"])
28
29
30Response = namedtuple('Response', 'path code headers')
31
32@contextlib.contextmanager
33def start_server(*resp):
34    """HTTP server replying with the given responses to the expected
35    requests."""
36    def url(port, path):
37        return 'http://%s:%s%s' % (socket.gethostname().lower(), port, path)
38
39    responses = list(reversed(resp))
40
41    class MyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
42        def do_HEAD(self):
43            response = responses.pop()
44            assert response.path == self.path
45            self.send_response(response.code)
46            for header, value in list(response.headers.items()):
47                self.send_header(header, value)
48            self.end_headers()
49
50    httpd = SocketServer.TCPServer(("", 0), MyHandler)
51    t = threading.Thread(target=httpd.serve_forever)
52    t.setDaemon(True)
53    t.start()
54    port = httpd.server_address[1]
55    yield functools.partial(url, port)
56    httpd.shutdown()
57
58def test_follow_redirects_direct_link():
59    link = "/resource"
60    with start_server(Response(link, 200, {})) as url:
61        assert url(link) == follow_redirects(url(link))
62
63def test_follow_redirects_redirected_link():
64    redirected = "/redirected"
65    link = "/resource"
66    with start_server(
67        Response(link, 301, {"Location": redirected}),
68        Response(redirected, 200, {})) as url:
69        assert url(redirected) == follow_redirects(url(link))
70
71def test_follow_redirects_unavailable():
72    link = "/resource"
73    with start_server(Response(link, 404, {})) as url:
74        assert url(link) == follow_redirects(url(link))
75
76def test_follow_redirects_link_to_last_available():
77    unavailable = "/unavailable"
78    link = "/resource"
79    with start_server(
80        Response(link, 301, {"Location": unavailable}),
81        Response(unavailable, 404, {})) as url:
82        assert url(unavailable) == follow_redirects(url(link))
83
84
85def test_follow_redirects_no_where():
86    link = "http://links.nowhere/"
87    assert link == follow_redirects(link)
88
89def test_follow_redirects_link_to_nowhere():
90    unavailable = "http://links.nowhere/"
91    link = "/resource"
92    with start_server(
93        Response(link, 301, {"Location": unavailable})) as url:
94        assert unavailable == follow_redirects(url(link))
95
96def test_follow_redirects_filtered_by_site():
97    link = "/resource"
98    with start_server() as url:
99        assert url(link) == follow_redirects(url(link), ["other_host"])
100
101
102def test_follow_redirects_filtered_by_site_after_redirect():
103    link = "/resource"
104    redirected = "/redirected"
105    filtered = "http://dont-follow/"
106    with start_server(
107        Response(link, 301, {"Location": redirected}),
108        Response(redirected, 301, {"Location": filtered})) as url:
109        hosts = [socket.gethostname().lower()]
110        assert filtered == follow_redirects(url(link), hosts)
111
112def test_follow_redirects_filtered_by_site_allowed():
113    redirected = "/redirected"
114    link = "/resource"
115    with start_server(
116        Response(link, 301, {"Location": redirected}),
117        Response(redirected, 200, {})) as url:
118        hosts = [socket.gethostname().lower()]
119        assert url(redirected) == follow_redirects(url(link), hosts)
120
121def test_expand_line():
122    redirected = "/redirected"
123    link = "/resource"
124    with start_server(
125        Response(link, 301, {"Location": redirected}),
126        Response(redirected, 200, {})) as url:
127        fmt = "before %s after"
128        line = fmt % url(link)
129        expected = fmt % url(redirected)
130        assert expected == expand_line(line, None)
131
132def test_parse_host_config():
133    assert set() == parse_host_list("")
134    assert set("h") == parse_host_list("h")
135    assert set(["1", "2"]) == parse_host_list("1,2")
136    assert set(["1", "2"]) == parse_host_list(" 1 , 2 ")
137
138