1# Copyright 2015 by Carlos Pena. All rights reserved. 2# This code is part of the Biopython distribution and governed by its 3# license. Please see the LICENSE file that should have been included 4# as part of this package. 5 6"""Offline tests for two Entrez features. 7 8(1) the URL construction of NCBI's Entrez services. 9(2) setting a custom directory for DTD and XSD downloads. 10""" 11 12import unittest 13from unittest import mock 14import warnings 15from http.client import HTTPMessage 16from urllib.parse import urlparse, parse_qs 17 18from Bio import Entrez 19from Bio.Entrez import Parser 20 21 22# This lets us set the email address to be sent to NCBI Entrez: 23Entrez.email = "biopython@biopython.org" 24Entrez.api_key = "5cfd4026f9df285d6cfc723c662d74bcbe09" 25 26URL_HEAD = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/" 27 28# Default values of URL query string (or POST data) when parsed with urllib.parse.parse_qs 29QUERY_DEFAULTS = { 30 "tool": [Entrez.tool], 31 "email": [Entrez.email], 32 "api_key": [Entrez.api_key], 33} 34 35 36def get_base_url(parsed): 37 """Convert a parsed URL back to string but only include scheme, netloc, and path, omitting query.""" 38 return parsed.scheme + "://" + parsed.netloc + parsed.path 39 40 41def mock_httpresponse(code=200, content_type="/xml"): 42 """Create a mocked version of a response object returned by urlopen(). 43 44 :param int code: Value of "code" attribute. 45 :param str content_type: Used to set the "Content-Type" header in the "headers" attribute. This 46 is checked in Entrez._open() to determine if the response data is plain text. 47 """ 48 resp = mock.NonCallableMock() 49 resp.code = code 50 51 resp.headers = HTTPMessage() 52 resp.headers.add_header("Content-Type", content_type + "; charset=UTF-8") 53 54 return resp 55 56 57def patch_urlopen(**kwargs): 58 """Create a context manager which replaces Bio.Entrez.urlopen with a mocked version. 59 60 Within the decorated function, Bio.Entrez.urlopen will be replaced with a unittest.mock.Mock 61 object which when called simply records the arguments passed to it and returns a mocked response 62 object. The actual urlopen function will not be called so no request will actually be made. 63 """ 64 response = mock_httpresponse(**kwargs) 65 return unittest.mock.patch("Bio.Entrez.urlopen", return_value=response) 66 67 68def get_patched_get_url(patched_urlopen, testcase=None): 69 """Get the URL of the GET request made to the patched urlopen() function. 70 71 Expects that the patched function should have been called a single time with the url as the only 72 positional argument and no keyword arguments. 73 74 :param patched_urlopen: value returned when entering the context manager created by patch_urlopen. 75 :type patched_urlopen: unittest.mock.Mock 76 :param testcase: Test case currently being run, which is used to make asserts 77 :type testcase: unittest.TestCase 78 """ 79 args, kwargs = patched_urlopen.call_args 80 81 if testcase is not None: 82 testcase.assertEqual(patched_urlopen.call_count, 1) 83 testcase.assertEqual(len(args), 1) 84 testcase.assertEqual(len(kwargs), 0) 85 86 return args[0] 87 88 89def get_patched_post_args(patched_urlopen, testcase=None, decode=False): 90 """Get the URL and content data of the POST request made to the patched urlopen() function. 91 92 Expects that the patched function should have been called a single time with the url as the only 93 positional argument and "data" as the only keyword argument. Returns a (url, data) tuple. 94 95 :param patched_urlopen: value returned when entering the context manager created by patch_urlopen. 96 :type patched_urlopen: unittest.mock.Mock 97 :param testcase: Test case currently being run, which is used to make asserts 98 :type testcase: unittest.TestCase 99 :param bool decode: Decode the value of the "data" keyword argument before returning 100 """ 101 args, kwargs = patched_urlopen.call_args 102 103 if testcase is not None: 104 testcase.assertEqual(patched_urlopen.call_count, 1) 105 testcase.assertEqual(len(args), 1) 106 testcase.assertEqual(list(kwargs), ["data"]) 107 108 data = kwargs["data"] 109 if decode: 110 data = data.decode("utf8") 111 112 return args[0], data 113 114 115class TestURLConstruction(unittest.TestCase): 116 def test_email_warning(self): 117 """Test issuing warning when user does not specify email address.""" 118 Entrez.email = None 119 120 with warnings.catch_warnings(record=True) as w: 121 Entrez._construct_params(params=None) 122 self.assertEqual(len(w), 1) 123 124 def test_construct_cgi_ecitmatch(self): 125 citation = { 126 "journal_title": "proc natl acad sci u s a", 127 "year": "1991", 128 "volume": "88", 129 "first_page": "3248", 130 "author_name": "mann bj", 131 "key": "citation_1", 132 } 133 variables = Entrez._update_ecitmatch_variables( 134 {"db": "pubmed", "bdata": [citation]} 135 ) 136 137 with patch_urlopen() as patched: 138 Entrez.ecitmatch(**variables) 139 140 result_url = get_patched_get_url(patched, self) 141 parsed = urlparse(result_url) 142 query = parse_qs(parsed.query) 143 144 self.assertEqual(get_base_url(parsed), URL_HEAD + "ecitmatch.cgi") 145 query.pop("bdata") # TODO 146 self.assertDictEqual( 147 query, {"retmode": ["xml"], "db": [variables["db"]], **QUERY_DEFAULTS}, 148 ) 149 150 def test_construct_cgi_einfo(self): 151 """Test constructed url for request to Entrez.""" 152 with patch_urlopen() as patched: 153 Entrez.einfo() 154 155 result_url = get_patched_get_url(patched, self) 156 parsed = urlparse(result_url) 157 query = parse_qs(parsed.query) 158 159 self.assertEqual(get_base_url(parsed), URL_HEAD + "einfo.fcgi") 160 self.assertDictEqual(query, QUERY_DEFAULTS) 161 162 def test_construct_cgi_epost1(self): 163 variables = {"db": "nuccore", "id": "186972394,160418"} 164 165 with patch_urlopen() as patched: 166 Entrez.epost(**variables) 167 168 result_url, options = get_patched_post_args(patched, self, decode=True) 169 query = parse_qs(options) 170 171 self.assertEqual(result_url, URL_HEAD + "epost.fcgi") # Params in POST data 172 self.assertDictEqual( 173 query, {"db": [variables["db"]], "id": [variables["id"]], **QUERY_DEFAULTS}, 174 ) 175 176 def test_construct_cgi_epost2(self): 177 variables = {"db": "nuccore", "id": ["160418", "160351"]} 178 179 with patch_urlopen() as patched: 180 Entrez.epost(**variables) 181 182 result_url, options = get_patched_post_args(patched, self, decode=True) 183 query = parse_qs(options) 184 185 self.assertEqual(result_url, URL_HEAD + "epost.fcgi") # Params in POST data 186 # Compare IDs up to reordering: 187 self.assertCountEqual(query.pop("id"), variables["id"]) 188 self.assertDictEqual( 189 query, {"db": [variables["db"]], **QUERY_DEFAULTS}, 190 ) 191 192 def test_construct_cgi_elink1(self): 193 variables = { 194 "cmd": "neighbor_history", 195 "db": "nucleotide", 196 "dbfrom": "protein", 197 "id": "22347800,48526535", 198 "query_key": None, 199 "webenv": None, 200 } 201 202 with patch_urlopen() as patched: 203 Entrez.elink(**variables) 204 205 result_url = get_patched_get_url(patched, self) 206 parsed = urlparse(result_url) 207 query = parse_qs(parsed.query) 208 209 self.assertEqual(get_base_url(parsed), URL_HEAD + "elink.fcgi") 210 self.assertDictEqual( 211 query, 212 { 213 "cmd": [variables["cmd"]], 214 "db": [variables["db"]], 215 "dbfrom": [variables["dbfrom"]], 216 "id": [variables["id"]], 217 **QUERY_DEFAULTS, 218 }, 219 ) 220 221 def test_construct_cgi_elink2(self): 222 """Commas: Link from protein to gene.""" 223 variables = { 224 "db": "gene", 225 "dbfrom": "protein", 226 "id": "15718680,157427902,119703751", 227 } 228 229 with patch_urlopen() as patched: 230 Entrez.elink(**variables) 231 232 result_url = get_patched_get_url(patched, self) 233 parsed = urlparse(result_url) 234 query = parse_qs(parsed.query) 235 236 self.assertEqual(get_base_url(parsed), URL_HEAD + "elink.fcgi") 237 self.assertDictEqual( 238 query, 239 { 240 "db": [variables["db"]], 241 "dbfrom": [variables["dbfrom"]], 242 "id": [variables["id"]], 243 **QUERY_DEFAULTS, 244 }, 245 ) 246 247 def test_construct_cgi_elink3(self): 248 """Multiple ID entries: Find one-to-one links from protein to gene.""" 249 variables = { 250 "db": "gene", 251 "dbfrom": "protein", 252 "id": ["15718680", "157427902", "119703751"], 253 } 254 255 with patch_urlopen() as patched: 256 Entrez.elink(**variables) 257 258 result_url = get_patched_get_url(patched, self) 259 parsed = urlparse(result_url) 260 query = parse_qs(parsed.query) 261 262 self.assertEqual(get_base_url(parsed), URL_HEAD + "elink.fcgi") 263 # Compare IDs up to reordering: 264 self.assertCountEqual(query.pop("id"), variables["id"]) 265 self.assertDictEqual( 266 query, 267 { 268 "db": [variables["db"]], 269 "dbfrom": [variables["dbfrom"]], 270 **QUERY_DEFAULTS, 271 }, 272 ) 273 274 def test_construct_cgi_efetch(self): 275 variables = { 276 "db": "protein", 277 "id": "15718680,157427902,119703751", 278 "retmode": "xml", 279 } 280 281 with patch_urlopen() as patched: 282 Entrez.efetch(**variables) 283 284 result_url = get_patched_get_url(patched, self) 285 parsed = urlparse(result_url) 286 query = parse_qs(parsed.query) 287 288 self.assertEqual(get_base_url(parsed), URL_HEAD + "efetch.fcgi") 289 self.assertDictEqual( 290 query, 291 { 292 "db": [variables["db"]], 293 "id": [variables["id"]], 294 "retmode": [variables["retmode"]], 295 **QUERY_DEFAULTS, 296 }, 297 ) 298 299 300class CustomDirectoryTest(unittest.TestCase): 301 """Offline unit test for custom directory feature. 302 303 Allow user to specify a custom directory for Entrez DTD/XSD files by setting 304 Parser.DataHandler.directory. 305 """ 306 307 def test_custom_directory(self): 308 import tempfile 309 import os 310 import shutil 311 312 handler = Parser.DataHandler(validate=False, escape=False) 313 314 # Create a temporary directory 315 tmpdir = tempfile.mkdtemp() 316 # Set the custom directory to the temporary directory. 317 # This assignment statement will also initialize the local DTD and XSD 318 # directories. 319 Parser.DataHandler.directory = tmpdir 320 321 # Confirm that the two temp directories are named what we want. 322 self.assertEqual( 323 handler.local_dtd_dir, os.path.join(tmpdir, "Bio", "Entrez", "DTDs"), 324 ) 325 self.assertEqual( 326 handler.local_xsd_dir, os.path.join(tmpdir, "Bio", "Entrez", "XSDs"), 327 ) 328 329 # And that they were created. 330 self.assertTrue(os.path.isdir(handler.local_dtd_dir)) 331 self.assertTrue(os.path.isdir(handler.local_xsd_dir)) 332 shutil.rmtree(tmpdir) 333 334 335if __name__ == "__main__": 336 runner = unittest.TextTestRunner(verbosity=2) 337 unittest.main(testRunner=runner) 338