1# Copyright 2015 by Carlos Pena.  All rights reserved.
2# This code is part of the Biopython distribution and governed by its
3# license.  Please see the LICENSE file that should have been included
4# as part of this package.
5
6"""Offline tests for two Entrez features.
7
8(1) the URL construction of NCBI's Entrez services.
9(2) setting a custom directory for DTD and XSD downloads.
10"""
11
12import unittest
13from unittest import mock
14import warnings
15from http.client import HTTPMessage
16from urllib.parse import urlparse, parse_qs
17
18from Bio import Entrez
19from Bio.Entrez import Parser
20
21
22# This lets us set the email address to be sent to NCBI Entrez:
23Entrez.email = "biopython@biopython.org"
24Entrez.api_key = "5cfd4026f9df285d6cfc723c662d74bcbe09"
25
26URL_HEAD = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/"
27
28# Default values of URL query string (or POST data) when parsed with urllib.parse.parse_qs
29QUERY_DEFAULTS = {
30    "tool": [Entrez.tool],
31    "email": [Entrez.email],
32    "api_key": [Entrez.api_key],
33}
34
35
36def get_base_url(parsed):
37    """Convert a parsed URL back to string but only include scheme, netloc, and path, omitting query."""
38    return parsed.scheme + "://" + parsed.netloc + parsed.path
39
40
41def mock_httpresponse(code=200, content_type="/xml"):
42    """Create a mocked version of a response object returned by urlopen().
43
44    :param int code: Value of "code" attribute.
45    :param str content_type: Used to set the "Content-Type" header in the "headers" attribute. This
46        is checked in Entrez._open() to determine if the response data is plain text.
47    """
48    resp = mock.NonCallableMock()
49    resp.code = code
50
51    resp.headers = HTTPMessage()
52    resp.headers.add_header("Content-Type", content_type + "; charset=UTF-8")
53
54    return resp
55
56
57def patch_urlopen(**kwargs):
58    """Create a context manager which replaces Bio.Entrez.urlopen with a mocked version.
59
60    Within the decorated function, Bio.Entrez.urlopen will be replaced with a unittest.mock.Mock
61    object which when called simply records the arguments passed to it and returns a mocked response
62    object. The actual urlopen function will not be called so no request will actually be made.
63    """
64    response = mock_httpresponse(**kwargs)
65    return unittest.mock.patch("Bio.Entrez.urlopen", return_value=response)
66
67
68def get_patched_get_url(patched_urlopen, testcase=None):
69    """Get the URL of the GET request made to the patched urlopen() function.
70
71    Expects that the patched function should have been called a single time with the url as the only
72    positional argument and no keyword arguments.
73
74    :param patched_urlopen: value returned when entering the context manager created by patch_urlopen.
75    :type patched_urlopen: unittest.mock.Mock
76    :param testcase: Test case currently being run, which is used to make asserts
77    :type testcase: unittest.TestCase
78    """
79    args, kwargs = patched_urlopen.call_args
80
81    if testcase is not None:
82        testcase.assertEqual(patched_urlopen.call_count, 1)
83        testcase.assertEqual(len(args), 1)
84        testcase.assertEqual(len(kwargs), 0)
85
86    return args[0]
87
88
89def get_patched_post_args(patched_urlopen, testcase=None, decode=False):
90    """Get the URL and content data of the POST request made to the patched urlopen() function.
91
92    Expects that the patched function should have been called a single time with the url as the only
93    positional argument and "data" as the only keyword argument. Returns a (url, data) tuple.
94
95    :param patched_urlopen: value returned when entering the context manager created by patch_urlopen.
96    :type patched_urlopen: unittest.mock.Mock
97    :param testcase: Test case currently being run, which is used to make asserts
98    :type testcase: unittest.TestCase
99    :param bool decode: Decode the value of the "data" keyword argument before returning
100    """
101    args, kwargs = patched_urlopen.call_args
102
103    if testcase is not None:
104        testcase.assertEqual(patched_urlopen.call_count, 1)
105        testcase.assertEqual(len(args), 1)
106        testcase.assertEqual(list(kwargs), ["data"])
107
108    data = kwargs["data"]
109    if decode:
110        data = data.decode("utf8")
111
112    return args[0], data
113
114
115class TestURLConstruction(unittest.TestCase):
116    def test_email_warning(self):
117        """Test issuing warning when user does not specify email address."""
118        Entrez.email = None
119
120        with warnings.catch_warnings(record=True) as w:
121            Entrez._construct_params(params=None)
122            self.assertEqual(len(w), 1)
123
124    def test_construct_cgi_ecitmatch(self):
125        citation = {
126            "journal_title": "proc natl acad sci u s a",
127            "year": "1991",
128            "volume": "88",
129            "first_page": "3248",
130            "author_name": "mann bj",
131            "key": "citation_1",
132        }
133        variables = Entrez._update_ecitmatch_variables(
134            {"db": "pubmed", "bdata": [citation]}
135        )
136
137        with patch_urlopen() as patched:
138            Entrez.ecitmatch(**variables)
139
140        result_url = get_patched_get_url(patched, self)
141        parsed = urlparse(result_url)
142        query = parse_qs(parsed.query)
143
144        self.assertEqual(get_base_url(parsed), URL_HEAD + "ecitmatch.cgi")
145        query.pop("bdata")  # TODO
146        self.assertDictEqual(
147            query, {"retmode": ["xml"], "db": [variables["db"]], **QUERY_DEFAULTS},
148        )
149
150    def test_construct_cgi_einfo(self):
151        """Test constructed url for request to Entrez."""
152        with patch_urlopen() as patched:
153            Entrez.einfo()
154
155        result_url = get_patched_get_url(patched, self)
156        parsed = urlparse(result_url)
157        query = parse_qs(parsed.query)
158
159        self.assertEqual(get_base_url(parsed), URL_HEAD + "einfo.fcgi")
160        self.assertDictEqual(query, QUERY_DEFAULTS)
161
162    def test_construct_cgi_epost1(self):
163        variables = {"db": "nuccore", "id": "186972394,160418"}
164
165        with patch_urlopen() as patched:
166            Entrez.epost(**variables)
167
168        result_url, options = get_patched_post_args(patched, self, decode=True)
169        query = parse_qs(options)
170
171        self.assertEqual(result_url, URL_HEAD + "epost.fcgi")  # Params in POST data
172        self.assertDictEqual(
173            query, {"db": [variables["db"]], "id": [variables["id"]], **QUERY_DEFAULTS},
174        )
175
176    def test_construct_cgi_epost2(self):
177        variables = {"db": "nuccore", "id": ["160418", "160351"]}
178
179        with patch_urlopen() as patched:
180            Entrez.epost(**variables)
181
182        result_url, options = get_patched_post_args(patched, self, decode=True)
183        query = parse_qs(options)
184
185        self.assertEqual(result_url, URL_HEAD + "epost.fcgi")  # Params in POST data
186        # Compare IDs up to reordering:
187        self.assertCountEqual(query.pop("id"), variables["id"])
188        self.assertDictEqual(
189            query, {"db": [variables["db"]], **QUERY_DEFAULTS},
190        )
191
192    def test_construct_cgi_elink1(self):
193        variables = {
194            "cmd": "neighbor_history",
195            "db": "nucleotide",
196            "dbfrom": "protein",
197            "id": "22347800,48526535",
198            "query_key": None,
199            "webenv": None,
200        }
201
202        with patch_urlopen() as patched:
203            Entrez.elink(**variables)
204
205        result_url = get_patched_get_url(patched, self)
206        parsed = urlparse(result_url)
207        query = parse_qs(parsed.query)
208
209        self.assertEqual(get_base_url(parsed), URL_HEAD + "elink.fcgi")
210        self.assertDictEqual(
211            query,
212            {
213                "cmd": [variables["cmd"]],
214                "db": [variables["db"]],
215                "dbfrom": [variables["dbfrom"]],
216                "id": [variables["id"]],
217                **QUERY_DEFAULTS,
218            },
219        )
220
221    def test_construct_cgi_elink2(self):
222        """Commas: Link from protein to gene."""
223        variables = {
224            "db": "gene",
225            "dbfrom": "protein",
226            "id": "15718680,157427902,119703751",
227        }
228
229        with patch_urlopen() as patched:
230            Entrez.elink(**variables)
231
232        result_url = get_patched_get_url(patched, self)
233        parsed = urlparse(result_url)
234        query = parse_qs(parsed.query)
235
236        self.assertEqual(get_base_url(parsed), URL_HEAD + "elink.fcgi")
237        self.assertDictEqual(
238            query,
239            {
240                "db": [variables["db"]],
241                "dbfrom": [variables["dbfrom"]],
242                "id": [variables["id"]],
243                **QUERY_DEFAULTS,
244            },
245        )
246
247    def test_construct_cgi_elink3(self):
248        """Multiple ID entries: Find one-to-one links from protein to gene."""
249        variables = {
250            "db": "gene",
251            "dbfrom": "protein",
252            "id": ["15718680", "157427902", "119703751"],
253        }
254
255        with patch_urlopen() as patched:
256            Entrez.elink(**variables)
257
258        result_url = get_patched_get_url(patched, self)
259        parsed = urlparse(result_url)
260        query = parse_qs(parsed.query)
261
262        self.assertEqual(get_base_url(parsed), URL_HEAD + "elink.fcgi")
263        # Compare IDs up to reordering:
264        self.assertCountEqual(query.pop("id"), variables["id"])
265        self.assertDictEqual(
266            query,
267            {
268                "db": [variables["db"]],
269                "dbfrom": [variables["dbfrom"]],
270                **QUERY_DEFAULTS,
271            },
272        )
273
274    def test_construct_cgi_efetch(self):
275        variables = {
276            "db": "protein",
277            "id": "15718680,157427902,119703751",
278            "retmode": "xml",
279        }
280
281        with patch_urlopen() as patched:
282            Entrez.efetch(**variables)
283
284        result_url = get_patched_get_url(patched, self)
285        parsed = urlparse(result_url)
286        query = parse_qs(parsed.query)
287
288        self.assertEqual(get_base_url(parsed), URL_HEAD + "efetch.fcgi")
289        self.assertDictEqual(
290            query,
291            {
292                "db": [variables["db"]],
293                "id": [variables["id"]],
294                "retmode": [variables["retmode"]],
295                **QUERY_DEFAULTS,
296            },
297        )
298
299
300class CustomDirectoryTest(unittest.TestCase):
301    """Offline unit test for custom directory feature.
302
303    Allow user to specify a custom directory for Entrez DTD/XSD files by setting
304    Parser.DataHandler.directory.
305    """
306
307    def test_custom_directory(self):
308        import tempfile
309        import os
310        import shutil
311
312        handler = Parser.DataHandler(validate=False, escape=False)
313
314        # Create a temporary directory
315        tmpdir = tempfile.mkdtemp()
316        # Set the custom directory to the temporary directory.
317        # This assignment statement will also initialize the local DTD and XSD
318        # directories.
319        Parser.DataHandler.directory = tmpdir
320
321        # Confirm that the two temp directories are named what we want.
322        self.assertEqual(
323            handler.local_dtd_dir, os.path.join(tmpdir, "Bio", "Entrez", "DTDs"),
324        )
325        self.assertEqual(
326            handler.local_xsd_dir, os.path.join(tmpdir, "Bio", "Entrez", "XSDs"),
327        )
328
329        # And that they were created.
330        self.assertTrue(os.path.isdir(handler.local_dtd_dir))
331        self.assertTrue(os.path.isdir(handler.local_xsd_dir))
332        shutil.rmtree(tmpdir)
333
334
335if __name__ == "__main__":
336    runner = unittest.TextTestRunner(verbosity=2)
337    unittest.main(testRunner=runner)
338