1# This code is part of the Biopython distribution and governed by its 2# license. Please see the LICENSE file that should have been included 3# as part of this package. 4"""Dealing with storage of biopython objects in a BioSQL relational db.""" 5 6import configparser 7import os 8import platform 9import tempfile 10import time 11import unittest 12 13from io import StringIO 14 15# Hide annoying warnings from things like bonds in GenBank features, 16# or PostgreSQL schema rules. TODO - test these warnings are raised! 17import warnings 18from Bio import BiopythonWarning 19 20# local stuff 21from Bio import MissingExternalDependencyError 22from Bio.Seq import Seq, MutableSeq, UndefinedSequenceError 23from Bio.SeqFeature import SeqFeature, UnknownPosition, ExactPosition 24from Bio import SeqIO 25from Bio.SeqRecord import SeqRecord 26 27from BioSQL import BioSeqDatabase 28from BioSQL import BioSeq 29 30from seq_tests_common import SeqRecordTestBaseClass 31 32if __name__ == "__main__": 33 raise RuntimeError("Call this via test_BioSQL_*.py not directly") 34 35# Exporting these to the test_BioSQL_XXX.py files which import this file: 36# DBDRIVER, DBTYPE, DBHOST, DBUSER, DBPASSWD, TESTDB, DBSCHEMA, SQL_FILE, SYSTEM 37 38SYSTEM = platform.system() 39 40 41def load_biosql_ini(DBTYPE): 42 """Load the database settings from INI file.""" 43 if not os.path.isfile("biosql.ini"): 44 raise MissingExternalDependencyError( 45 "BioSQL test configuration file biosql.ini missing (see biosql.ini.sample)" 46 ) 47 48 config = configparser.ConfigParser() 49 config.read("biosql.ini") 50 DBHOST = config.get(DBTYPE, "dbhost") 51 DBUSER = config.get(DBTYPE, "dbuser") 52 DBPASSWD = config.get(DBTYPE, "dbpasswd") 53 TESTDB = config.get(DBTYPE, "testdb") 54 return DBHOST, DBUSER, DBPASSWD, TESTDB 55 56 57def temp_db_filename(): 58 """Generate a temporary filename for SQLite database.""" 59 # In memory SQLite does not work with current test structure since the tests 60 # expect databases to be retained between individual tests. 61 # TESTDB = ':memory:' 62 # Instead, we use (if we can) /dev/shm 63 try: 64 h, test_db_fname = tempfile.mkstemp("_BioSQL.db", dir="/dev/shm") 65 except OSError: 66 # We can't use /dev/shm 67 h, test_db_fname = tempfile.mkstemp("_BioSQL.db") 68 os.close(h) 69 return test_db_fname 70 71 72def check_config(dbdriver, dbtype, dbhost, dbuser, dbpasswd, testdb): 73 """Verify the database settings work for connecting.""" 74 global DBDRIVER, DBTYPE, DBHOST, DBUSER, DBPASSWD, TESTDB, DBSCHEMA 75 global SYSTEM, SQL_FILE 76 DBDRIVER = dbdriver 77 DBTYPE = dbtype 78 DBHOST = dbhost 79 DBUSER = dbuser 80 DBPASSWD = dbpasswd 81 TESTDB = testdb 82 83 if not DBDRIVER or not DBTYPE or not DBUSER: 84 # No point going any further... 85 raise MissingExternalDependencyError("Incomplete BioSQL test settings") 86 87 # Check the database driver is installed: 88 if SYSTEM == "Java": 89 try: 90 if DBDRIVER in ["MySQLdb"]: 91 import com.mysql.jdbc.Driver 92 elif DBDRIVER in ["psycopg2", "pgdb"]: 93 import org.postgresql.Driver 94 except ImportError: 95 message = "Install the JDBC driver for %s to use BioSQL " % DBTYPE 96 raise MissingExternalDependencyError(message) from None 97 else: 98 try: 99 __import__(DBDRIVER) 100 except ImportError: 101 if DBDRIVER in ["MySQLdb"]: 102 message = ( 103 "Install MySQLdb or mysqlclient if you want to use %s with BioSQL " 104 % (DBTYPE) 105 ) 106 else: 107 message = "Install %s if you want to use %s with BioSQL " % ( 108 DBDRIVER, 109 DBTYPE, 110 ) 111 raise MissingExternalDependencyError(message) from None 112 113 try: 114 if DBDRIVER in ["sqlite3"]: 115 server = BioSeqDatabase.open_database(driver=DBDRIVER, db=TESTDB) 116 else: 117 server = BioSeqDatabase.open_database( 118 driver=DBDRIVER, host=DBHOST, user=DBUSER, passwd=DBPASSWD 119 ) 120 server.close() 121 del server 122 except Exception as e: 123 message = "Connection failed, check settings if you plan to use BioSQL: %s" % e 124 raise MissingExternalDependencyError(message) from None 125 126 DBSCHEMA = "biosqldb-" + DBTYPE + ".sql" 127 SQL_FILE = os.path.join(os.getcwd(), "BioSQL", DBSCHEMA) 128 129 if not os.path.isfile(SQL_FILE): 130 message = "Missing SQL schema file: %s" % SQL_FILE 131 raise MissingExternalDependencyError(message) 132 133 134def _do_db_cleanup(): 135 """Cleanup everything from TESTDB. 136 137 Relevant for MySQL and PostgreSQL. 138 """ 139 if DBDRIVER in ["psycopg2", "pgdb"]: 140 # first open a connection the database 141 # notice that postgres doesn't have createdb privileges, so 142 # the TESTDB must exist 143 server = BioSeqDatabase.open_database( 144 driver=DBDRIVER, host=DBHOST, user=DBUSER, passwd=DBPASSWD, db=TESTDB 145 ) 146 147 # The pgdb postgres driver does not support autocommit, so here we 148 # commit the current transaction so that 'drop database' query will 149 # be outside a transaction block 150 server.adaptor.cursor.execute("COMMIT") 151 # drop anything in the database 152 # with Postgres, can get errors about database still being used. 153 # Wait briefly to be sure previous tests are done with it. 154 time.sleep(1) 155 # drop anything in the database 156 sql = r"DROP OWNED BY " + DBUSER 157 server.adaptor.cursor.execute(sql, ()) 158 server.close() 159 else: 160 # first open a connection to create the database 161 server = BioSeqDatabase.open_database( 162 driver=DBDRIVER, host=DBHOST, user=DBUSER, passwd=DBPASSWD 163 ) 164 # Auto-commit 165 try: 166 server.adaptor.autocommit() 167 except AttributeError: 168 pass 169 # drop the database 170 try: 171 sql = r"DROP DATABASE " + TESTDB 172 server.adaptor.cursor.execute(sql, ()) 173 except ( 174 server.module.OperationalError, 175 server.module.Error, 176 server.module.DatabaseError, 177 ) as e: # the database doesn't exist 178 pass 179 except ( 180 server.module.IntegrityError, 181 server.module.ProgrammingError, 182 ) as e: # ditto--perhaps 183 if str(e).find('database "%s" does not exist' % TESTDB) == -1: 184 server.close() 185 raise 186 # create a new database 187 sql = r"CREATE DATABASE " + TESTDB 188 server.adaptor.execute(sql, ()) 189 server.close() 190 191 192def create_database(): 193 """Delete any existing BioSQL test DB, then (re)create an empty BioSQL DB. 194 195 Returns TESTDB name which will change for for SQLite. 196 """ 197 if DBDRIVER in ["sqlite3"]: 198 global TESTDB 199 if os.path.exists(TESTDB): 200 try: 201 os.remove(TESTDB) 202 except Exception: 203 time.sleep(1) 204 try: 205 os.remove(TESTDB) 206 except Exception: 207 # Seen this with PyPy 2.1 (and older) on Windows - 208 # which suggests an open handle still exists? 209 print("Could not remove %r" % TESTDB) 210 pass 211 # Now pick a new filename - just in case there is a stale handle 212 # (which might be happening under Windows...) 213 TESTDB = temp_db_filename() 214 else: 215 _do_db_cleanup() 216 217 # now open a connection to load the database 218 server = BioSeqDatabase.open_database( 219 driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB 220 ) 221 try: 222 server.load_database_sql(SQL_FILE) 223 server.commit() 224 server.close() 225 except Exception: 226 # Failed, but must close the handle... 227 server.close() 228 raise 229 230 return TESTDB 231 232 233def destroy_database(): 234 """Delete any temporary BioSQL sqlite3 database files.""" 235 if DBDRIVER in ["sqlite3"]: 236 if os.path.exists(TESTDB): 237 os.remove(TESTDB) 238 239 240def load_database(gb_filename_or_handle): 241 """Load a GenBank file into a new BioSQL database. 242 243 This is useful for running tests against a newly created database. 244 """ 245 TESTDB = create_database() 246 # now open a connection to load the database 247 db_name = "biosql-test" 248 server = BioSeqDatabase.open_database( 249 driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB 250 ) 251 db = server.new_database(db_name) 252 253 # get the GenBank file we are going to put into it 254 iterator = SeqIO.parse(gb_filename_or_handle, "gb") 255 records = [] 256 for record in iterator: 257 if record.annotations.get("molecule_type") == "mRNA": 258 record.annotations["molecule_type"] = "DNA" 259 records.append(record) 260 # finally put it in the database 261 count = db.load(records) 262 server.commit() 263 server.close() 264 return count 265 266 267def load_multi_database(gb_filename_or_handle, gb_filename_or_handle2): 268 """Load two GenBank files into a new BioSQL database as different subdatabases. 269 270 This is useful for running tests against a newly created database. 271 """ 272 TESTDB = create_database() 273 # now open a connection to load the database 274 db_name = "biosql-test" 275 db_name2 = "biosql-test2" 276 server = BioSeqDatabase.open_database( 277 driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB 278 ) 279 db = server.new_database(db_name) 280 281 # get the GenBank file we are going to put into it 282 iterator = SeqIO.parse(gb_filename_or_handle, "gb") 283 count = db.load(iterator) 284 285 db = server.new_database(db_name2) 286 287 # get the GenBank file we are going to put into it 288 iterator = SeqIO.parse(gb_filename_or_handle2, "gb") 289 # finally put it in the database 290 count2 = db.load(iterator) 291 server.commit() 292 293 server.close() 294 return count + count2 295 296 297class MultiReadTest(unittest.TestCase): 298 """Test reading a database with multiple namespaces.""" 299 300 loaded_db = 0 301 302 def setUp(self): 303 """Connect to and load up the database.""" 304 load_multi_database("GenBank/cor6_6.gb", "GenBank/NC_000932.gb") 305 306 self.server = BioSeqDatabase.open_database( 307 driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB 308 ) 309 310 self.db = self.server["biosql-test"] 311 self.db2 = self.server["biosql-test2"] 312 313 def tearDown(self): 314 self.server.close() 315 destroy_database() 316 del self.db 317 del self.db2 318 del self.server 319 320 def test_server(self): 321 """Check BioSeqDatabase methods.""" 322 server = self.server 323 self.assertIn("biosql-test", server) 324 self.assertIn("biosql-test2", server) 325 self.assertEqual(2, len(server)) 326 self.assertEqual(["biosql-test", "biosql-test2"], list(server.keys())) 327 # Check we can delete the namespace... 328 del server["biosql-test"] 329 del server["biosql-test2"] 330 self.assertEqual(0, len(server)) 331 with self.assertRaises(KeyError): 332 del server["non-existant-name"] 333 334 def test_get_db_items(self): 335 """Check list, keys, length etc.""" 336 db = self.db 337 items = list(db.values()) 338 keys = list(db) 339 length = len(items) 340 self.assertEqual(length, len(db)) 341 self.assertEqual(length, len(list(db))) 342 self.assertEqual(length, len(list(db.items()))) 343 self.assertEqual(length, len(list(db.keys()))) 344 self.assertEqual(length, len(list(db.values()))) 345 for (k1, r1), (k2, r2) in zip(zip(keys, items), db.items()): 346 self.assertEqual(k1, k2) 347 self.assertEqual(r1.id, r2.id) 348 for k in keys: 349 del db[k] 350 self.assertEqual(0, len(db)) 351 with self.assertRaises(KeyError): 352 del db["non-existant-name"] 353 354 def test_cross_retrieval_of_items(self): 355 """Test that valid ids can't be retrieved between namespaces.""" 356 db = self.db 357 db2 = self.db2 358 for db2_id in db2.keys(): 359 with self.assertRaises(KeyError): 360 rec = db[db2_id] 361 362 363class ReadTest(unittest.TestCase): 364 """Test reading a database from an already built database.""" 365 366 loaded_db = 0 367 368 def setUp(self): 369 """Connect to and load up the database.""" 370 load_database("GenBank/cor6_6.gb") 371 372 self.server = BioSeqDatabase.open_database( 373 driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB 374 ) 375 376 self.db = self.server["biosql-test"] 377 378 def tearDown(self): 379 self.server.close() 380 destroy_database() 381 del self.db 382 del self.server 383 384 def test_server(self): 385 """Check BioSeqDatabase methods.""" 386 server = self.server 387 self.assertIn("biosql-test", server) 388 self.assertEqual(1, len(server)) 389 self.assertEqual(["biosql-test"], list(server.keys())) 390 # Check we can delete the namespace... 391 del server["biosql-test"] 392 self.assertEqual(0, len(server)) 393 with self.assertRaises(KeyError): 394 del server["non-existant-name"] 395 396 def test_get_db_items(self): 397 """Check list, keys, length etc.""" 398 db = self.db 399 items = list(db.values()) 400 keys = list(db) 401 length = len(items) 402 self.assertEqual(length, len(db)) 403 self.assertEqual(length, len(list(db.items()))) 404 self.assertEqual(length, len(list(db))) 405 self.assertEqual(length, len(list(db.values()))) 406 for (k1, r1), (k2, r2) in zip(zip(keys, items), db.items()): 407 self.assertEqual(k1, k2) 408 self.assertEqual(r1.id, r2.id) 409 for k in keys: 410 del db[k] 411 self.assertEqual(0, len(db)) 412 with self.assertRaises(KeyError): 413 del db["non-existant-name"] 414 415 def test_lookup_items(self): 416 """Test retrieval of items using various ids.""" 417 self.db.lookup(accession="X62281") 418 self.assertRaises(IndexError, self.db.lookup, accession="Not real") 419 self.db.lookup(display_id="ATKIN2") 420 self.assertRaises(IndexError, self.db.lookup, display_id="Not real") 421 422 # primary id retrieval 423 self.db.lookup(primary_id="16353") 424 self.assertRaises(IndexError, self.db.lookup, primary_id="Not Real") 425 426 427class SeqInterfaceTest(unittest.TestCase): 428 """Make sure the BioSQL objects implement the expected biopython interface.""" 429 430 def setUp(self): 431 """Load a database.""" 432 load_database("GenBank/cor6_6.gb") 433 434 self.server = BioSeqDatabase.open_database( 435 driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB 436 ) 437 self.db = self.server["biosql-test"] 438 self.item = self.db.lookup(accession="X62281") 439 self.item2 = self.db.lookup(accession="AJ237582") 440 441 def tearDown(self): 442 self.server.close() 443 destroy_database() 444 del self.db 445 del self.item 446 del self.server 447 448 def test_seq_record(self): 449 """Make sure SeqRecords from BioSQL implement the right interface.""" 450 test_record = self.item 451 self.assertIsInstance(test_record.seq, Seq) 452 self.assertEqual(test_record.id, "X62281.1", test_record.id) 453 self.assertEqual(test_record.name, "ATKIN2") 454 self.assertEqual(test_record.description, "A.thaliana kin2 gene") 455 self.assertTrue(hasattr(test_record, "annotations")) 456 # XXX should do something with annotations once they are like 457 # a dictionary 458 for feature in test_record.features: 459 self.assertIsInstance(feature, SeqFeature) 460 # shouldn't cause any errors! 461 self.assertIsInstance(str(test_record), str) 462 # Confirm can delete annotations etc to test these properties 463 del test_record.annotations 464 del test_record.dbxrefs 465 del test_record.features 466 del test_record.seq 467 468 def test_seq(self): 469 """Make sure Seqs from BioSQL implement the right interface.""" 470 test_seq = self.item.seq 471 string_rep = str(test_seq) 472 self.assertEqual(string_rep, str(test_seq)) # check __str__ too 473 self.assertEqual(type(string_rep), type("")) 474 self.assertEqual(len(test_seq), 880) 475 self.assertEqual(test_seq[879], "A") 476 self.assertEqual(test_seq[-1], "A") 477 self.assertEqual(test_seq[0], "A") 478 self.assertEqual(test_seq[-880], "A") 479 self.assertRaises(IndexError, test_seq.__getitem__, 880) 480 self.assertRaises(IndexError, test_seq.__getitem__, -881) 481 self.assertRaises(TypeError, test_seq.__getitem__, None) 482 483 def test_convert(self): 484 """Check can turn a Seq object from BioSQL into a Seq or MutableSeq.""" 485 test_seq = self.item.seq 486 487 other = Seq(test_seq) 488 self.assertEqual(test_seq, other) 489 self.assertIsInstance(other, Seq) 490 491 other = MutableSeq(test_seq) 492 self.assertEqual(test_seq, other) 493 self.assertIsInstance(other, MutableSeq) 494 495 def test_addition(self): 496 """Check can add Seq objects from BioSQL together.""" 497 test_seq = self.item.seq 498 for other in [ 499 Seq("ACGT"), 500 MutableSeq("ACGT"), 501 "ACGT", 502 test_seq, 503 ]: 504 test = test_seq + other 505 self.assertEqual(test, str(test_seq) + str(other)) 506 self.assertIsInstance(test, Seq) 507 test = other + test_seq 508 self.assertEqual(test, str(other) + str(test_seq)) 509 510 def test_multiplication(self): 511 """Check can multiply Seq objects from BioSQL by integers.""" 512 test_seq = self.item.seq 513 tripled = test_seq * 3 514 # Test Seq.__mul__ 515 self.assertIsInstance(tripled, Seq) 516 self.assertEqual(tripled, str(test_seq) * 3) 517 # Test Seq.__rmul__ 518 tripled = 3 * test_seq 519 self.assertIsInstance(tripled, Seq) 520 self.assertEqual(tripled, str(test_seq) * 3) 521 # Test Seq.__imul__ 522 original = self.item.seq 523 tripled = test_seq 524 tripled *= 3 525 self.assertIsInstance(tripled, Seq) 526 self.assertEqual(tripled, str(original) * 3) 527 528 def test_seq_slicing(self): 529 """Check that slices of sequences are retrieved properly.""" 530 test_seq = self.item.seq 531 new_seq = test_seq[:10] 532 self.assertIsInstance(new_seq, Seq) 533 # simple slicing 534 self.assertEqual(test_seq[:5], "ATTTG") 535 self.assertEqual(test_seq[0:5], "ATTTG") 536 self.assertEqual(test_seq[2:3], "T") 537 self.assertEqual(test_seq[2:4], "TT") 538 self.assertEqual(test_seq[870:], "TTGAATTATA") 539 # getting more fancy 540 self.assertEqual(test_seq[-1], "A") 541 self.assertEqual(test_seq[1], "T") 542 self.assertEqual(test_seq[-10:][5:], "TTATA") 543 544 def test_record_slicing(self): 545 """Check that slices of DBSeqRecord are retrieved properly.""" 546 new_rec = self.item[400:] 547 self.assertIsInstance(new_rec, SeqRecord) 548 self.assertEqual(len(new_rec), 480) 549 self.assertEqual(len(new_rec.features), 5) 550 551 def test_seq_features(self): 552 """Check SeqFeatures of a sequence.""" 553 test_features = self.item.features 554 cds_feature = test_features[6] 555 self.assertEqual(cds_feature.type, "CDS") 556 self.assertEqual( 557 str(cds_feature.location), "join{[103:160](+), [319:390](+), [503:579](+)}" 558 ) 559 560 msg = "Missing expected entries, have %r" % cds_feature.qualifiers 561 self.assertIn("gene", cds_feature.qualifiers) 562 self.assertIn("protein_id", cds_feature.qualifiers) 563 self.assertIn("codon_start", cds_feature.qualifiers) 564 self.assertEqual(cds_feature.qualifiers.get("gene"), ["kin2"]) 565 self.assertEqual(cds_feature.qualifiers.get("protein_id"), ["CAA44171.1"]) 566 self.assertEqual(cds_feature.qualifiers.get("codon_start"), ["1"]) 567 568 self.assertIn("db_xref", cds_feature.qualifiers) 569 multi_ann = cds_feature.qualifiers["db_xref"] 570 self.assertEqual(len(multi_ann), 2) 571 self.assertIn("GI:16354", multi_ann) 572 self.assertIn("SWISS-PROT:P31169", multi_ann) 573 574 def test_eq(self): 575 seq1 = self.item.seq 576 seq2 = self.item2.seq 577 self.assertEqual(seq1[30:32], seq2[3:5]) 578 self.assertEqual(seq1[30:32], "CA") 579 self.assertEqual(seq2[3:5], "CA") 580 self.assertEqual(seq1[30:32], b"CA") 581 self.assertEqual(seq2[3:5], b"CA") 582 self.assertEqual(seq1[30:32], Seq("CA")) 583 self.assertEqual(seq2[3:5], Seq("CA")) 584 self.assertEqual(seq1[30:32], MutableSeq("CA")) 585 self.assertEqual(seq2[3:5], MutableSeq("CA")) 586 self.assertEqual(seq2[3:5], seq1[30:32]) 587 self.assertEqual("CA", seq1[30:32]) 588 self.assertEqual("CA", seq2[3:5]) 589 self.assertEqual(b"CA", seq1[30:32]) 590 self.assertEqual(b"CA", seq2[3:5]) 591 self.assertEqual(Seq("CA"), seq1[30:32]) 592 self.assertEqual(Seq("CA"), seq2[3:5]) 593 self.assertEqual(MutableSeq("CA"), seq1[30:32]) 594 self.assertEqual(MutableSeq("CA"), seq2[3:5]) 595 with self.assertRaises(UndefinedSequenceError): 596 seq1 == Seq(None, len(seq1)) 597 with self.assertRaises(UndefinedSequenceError): 598 seq2 == Seq(None, len(seq2)) 599 with self.assertRaises(UndefinedSequenceError): 600 seq1 == Seq(None, 10) 601 with self.assertRaises(UndefinedSequenceError): 602 seq2 == Seq(None, 10) 603 with self.assertRaises(UndefinedSequenceError): 604 Seq(None, len(seq1)) == seq1 605 with self.assertRaises(UndefinedSequenceError): 606 Seq(None, len(seq2)) == seq2 607 with self.assertRaises(UndefinedSequenceError): 608 Seq(None, 10) == seq1 609 with self.assertRaises(UndefinedSequenceError): 610 Seq(None, 10) == seq2 611 612 def test_ne(self): 613 seq1 = self.item.seq 614 seq2 = self.item2.seq 615 self.assertNotEqual(seq1, seq2) 616 self.assertNotEqual(seq1, "CA") 617 self.assertNotEqual(seq2, "CA") 618 self.assertNotEqual(seq1, b"CA") 619 self.assertNotEqual(seq2, b"CA") 620 self.assertNotEqual(seq1, Seq("CA")) 621 self.assertNotEqual(seq2, Seq("CA")) 622 self.assertNotEqual(seq1, MutableSeq("CA")) 623 self.assertNotEqual(seq2, MutableSeq("CA")) 624 self.assertNotEqual(seq1[30:32], "GG") 625 self.assertNotEqual(seq2[3:5], "GG") 626 self.assertNotEqual(seq1[30:32], b"GG") 627 self.assertNotEqual(seq2[3:5], b"GG") 628 self.assertNotEqual(seq1[30:32], Seq("GG")) 629 self.assertNotEqual(seq2[3:5], Seq("GG")) 630 self.assertNotEqual(seq1[30:32], MutableSeq("GG")) 631 self.assertNotEqual(seq2[3:5], MutableSeq("GG")) 632 self.assertNotEqual(seq2, seq1) 633 self.assertNotEqual("CA", seq1) 634 self.assertNotEqual("CA", seq2) 635 self.assertNotEqual(b"CA", seq1) 636 self.assertNotEqual(b"CA", seq2) 637 self.assertNotEqual(Seq("CA"), seq1) 638 self.assertNotEqual(Seq("CA"), seq2) 639 self.assertNotEqual(MutableSeq("CA"), seq1) 640 self.assertNotEqual(MutableSeq("CA"), seq2) 641 self.assertNotEqual("GG", seq1[30:32]) 642 self.assertNotEqual("GG", seq2[3:5]) 643 self.assertNotEqual(b"GG", seq1[30:32]) 644 self.assertNotEqual(b"GG", seq2[3:5]) 645 self.assertNotEqual(Seq("GG"), seq1[30:32]) 646 self.assertNotEqual(Seq("GG"), seq2[3:5]) 647 self.assertNotEqual(MutableSeq("GG"), seq1[30:32]) 648 self.assertNotEqual(MutableSeq("GG"), seq2[3:5]) 649 with self.assertRaises(UndefinedSequenceError): 650 seq1 != Seq(None, len(seq1)) 651 with self.assertRaises(UndefinedSequenceError): 652 seq2 != Seq(None, len(seq2)) 653 with self.assertRaises(UndefinedSequenceError): 654 seq1 != Seq(None, 10) 655 with self.assertRaises(UndefinedSequenceError): 656 seq2 != Seq(None, 10) 657 with self.assertRaises(UndefinedSequenceError): 658 Seq(None, len(seq1)) != seq1 659 with self.assertRaises(UndefinedSequenceError): 660 Seq(None, len(seq2)) != seq2 661 with self.assertRaises(UndefinedSequenceError): 662 Seq(None, 10) != seq1 663 with self.assertRaises(UndefinedSequenceError): 664 Seq(None, 10) != seq2 665 666 def test_lt(self): 667 seq1 = self.item.seq 668 seq2 = self.item2.seq 669 self.assertLess(seq1, seq2) 670 self.assertLess(seq1, "CC") 671 self.assertLess("CC", seq2) 672 self.assertLess(seq1, b"CC") 673 self.assertLess(b"CC", seq2) 674 self.assertLess(seq1, Seq("CC")) 675 self.assertLess(Seq("CC"), seq2) 676 self.assertLess(seq1, MutableSeq("CC")) 677 self.assertLess(MutableSeq("CC"), seq2) 678 self.assertLess("AA", seq1) 679 self.assertLess("AA", seq2) 680 self.assertLess(b"AA", seq1) 681 self.assertLess(b"AA", seq2) 682 self.assertLess(Seq("AA"), seq1) 683 self.assertLess(Seq("AA"), seq2) 684 self.assertLess(MutableSeq("AA"), seq1) 685 self.assertLess(MutableSeq("AA"), seq2) 686 self.assertLess(seq1, "TT") 687 self.assertLess(seq2, "TT") 688 self.assertLess(seq1, b"TT") 689 self.assertLess(seq2, b"TT") 690 self.assertLess(seq1, Seq("TT")) 691 self.assertLess(seq2, Seq("TT")) 692 self.assertLess(seq1, MutableSeq("TT")) 693 self.assertLess(seq2, MutableSeq("TT")) 694 with self.assertRaises(UndefinedSequenceError): 695 seq1 < Seq(None, len(seq1)) 696 with self.assertRaises(UndefinedSequenceError): 697 seq2 < Seq(None, len(seq2)) 698 with self.assertRaises(UndefinedSequenceError): 699 seq1 < Seq(None, 10) 700 with self.assertRaises(UndefinedSequenceError): 701 seq2 < Seq(None, 10) 702 self.assertLess("AA", seq1[30:32]) 703 self.assertLess("AA", seq2[3:5]) 704 self.assertLess(b"AA", seq1[30:32]) 705 self.assertLess(b"AA", seq2[3:5]) 706 self.assertLess(seq1[30:32], seq2[3:7]) 707 self.assertLess(Seq("AA"), seq1[30:32]) 708 self.assertLess(Seq("AA"), seq2[3:5]) 709 self.assertLess(MutableSeq("AA"), seq1[30:32]) 710 self.assertLess(MutableSeq("AA"), seq2[3:5]) 711 self.assertLess(seq1[30:32], "TT") 712 self.assertLess(seq2[3:5], "TT") 713 self.assertLess(seq1[30:32], b"TT") 714 self.assertLess(seq2[3:5], b"TT") 715 self.assertLess(seq1[30:32], Seq("TT")) 716 self.assertLess(seq2[3:5], Seq("TT")) 717 self.assertLess(seq1[30:32], MutableSeq("TT")) 718 self.assertLess(seq2[3:5], MutableSeq("TT")) 719 720 def test_le(self): 721 seq1 = self.item.seq 722 seq2 = self.item2.seq 723 self.assertLessEqual(seq1, seq2) 724 self.assertLessEqual(seq1, "CC") 725 self.assertLessEqual("CC", seq2) 726 self.assertLessEqual(seq1, b"CC") 727 self.assertLessEqual(b"CC", seq2) 728 self.assertLessEqual(seq1, Seq("CC")) 729 self.assertLessEqual(Seq("CC"), seq2) 730 self.assertLessEqual(seq1, MutableSeq("CC")) 731 self.assertLessEqual(MutableSeq("CC"), seq2) 732 self.assertLessEqual("AA", seq1) 733 self.assertLessEqual("AA", seq2) 734 self.assertLessEqual(b"AA", seq1) 735 self.assertLessEqual(b"AA", seq2) 736 self.assertLessEqual(Seq("AA"), seq1) 737 self.assertLessEqual(Seq("AA"), seq2) 738 self.assertLessEqual(MutableSeq("AA"), seq1) 739 self.assertLessEqual(MutableSeq("AA"), seq2) 740 self.assertLessEqual(seq1, "TT") 741 self.assertLessEqual(seq2, "TT") 742 self.assertLessEqual(seq1, b"TT") 743 self.assertLessEqual(seq2, b"TT") 744 self.assertLessEqual(seq1, Seq("TT")) 745 self.assertLessEqual(seq2, Seq("TT")) 746 self.assertLessEqual(seq1, MutableSeq("TT")) 747 self.assertLessEqual(seq2, MutableSeq("TT")) 748 with self.assertRaises(UndefinedSequenceError): 749 seq1 < Seq(None, len(seq1)) 750 with self.assertRaises(UndefinedSequenceError): 751 seq2 < Seq(None, len(seq2)) 752 with self.assertRaises(UndefinedSequenceError): 753 seq1 < Seq(None, 10) 754 with self.assertRaises(UndefinedSequenceError): 755 seq2 < Seq(None, 10) 756 self.assertLessEqual("AA", seq1[30:32]) 757 self.assertLessEqual("AA", seq2[3:5]) 758 self.assertLessEqual(b"AA", seq1[30:32]) 759 self.assertLessEqual(b"AA", seq2[3:5]) 760 self.assertLessEqual(seq1[30:32], seq2[3:7]) 761 self.assertLessEqual(Seq("AA"), seq1[30:32]) 762 self.assertLessEqual(Seq("AA"), seq2[3:5]) 763 self.assertLessEqual(MutableSeq("AA"), seq1[30:32]) 764 self.assertLessEqual(MutableSeq("AA"), seq2[3:5]) 765 self.assertLessEqual(seq1[30:32], "TT") 766 self.assertLessEqual(seq2[3:5], "TT") 767 self.assertLessEqual(seq1[30:32], b"TT") 768 self.assertLessEqual(seq2[3:5], b"TT") 769 self.assertLessEqual(seq1[30:32], Seq("TT")) 770 self.assertLessEqual(seq2[3:5], Seq("TT")) 771 self.assertLessEqual(seq1[30:32], MutableSeq("TT")) 772 self.assertLessEqual(seq2[3:5], MutableSeq("TT")) 773 774 def test_gt(self): 775 seq1 = self.item.seq 776 seq2 = self.item2.seq 777 self.assertGreater(seq2, seq1) 778 self.assertGreater("CC", seq1) 779 self.assertGreater(seq2, "CC") 780 self.assertGreater(b"CC", seq1) 781 self.assertGreater(seq2, b"CC") 782 self.assertGreater(Seq("CC"), seq1) 783 self.assertGreater(seq2, Seq("CC")) 784 self.assertGreater(MutableSeq("CC"), seq1) 785 self.assertGreater(seq2, MutableSeq("CC")) 786 self.assertGreater(seq1, "AA") 787 self.assertGreater(seq2, "AA") 788 self.assertGreater(seq1, b"AA") 789 self.assertGreater(seq2, b"AA") 790 self.assertGreater(seq1, Seq("AA")) 791 self.assertGreater(seq2, Seq("AA")) 792 self.assertGreater(seq1, MutableSeq("AA")) 793 self.assertGreater(seq2, MutableSeq("AA")) 794 self.assertGreater("TT", seq1) 795 self.assertGreater("TT", seq2) 796 self.assertGreater(b"TT", seq1) 797 self.assertGreater(b"TT", seq2) 798 self.assertGreater(Seq("TT"), seq1) 799 self.assertGreater(Seq("TT"), seq2) 800 self.assertGreater(MutableSeq("TT"), seq1) 801 self.assertGreater(MutableSeq("TT"), seq2) 802 with self.assertRaises(UndefinedSequenceError): 803 seq1 < Seq(None, len(seq1)) 804 with self.assertRaises(UndefinedSequenceError): 805 seq2 < Seq(None, len(seq2)) 806 with self.assertRaises(UndefinedSequenceError): 807 seq1 < Seq(None, 10) 808 with self.assertRaises(UndefinedSequenceError): 809 seq2 < Seq(None, 10) 810 self.assertGreater(seq1[30:32], "AA") 811 self.assertGreater(seq2[3:5], "AA") 812 self.assertGreater(seq1[30:32], b"AA") 813 self.assertGreater(seq2[3:5], b"AA") 814 self.assertGreater(seq1[30:34], seq2[3:5]) 815 self.assertGreater(seq1[30:32], Seq("AA")) 816 self.assertGreater(seq2[3:5], Seq("AA")) 817 self.assertGreater(seq1[30:32], MutableSeq("AA")) 818 self.assertGreater(seq2[3:5], MutableSeq("AA")) 819 self.assertGreater("TT", seq1[30:32]) 820 self.assertGreater("TT", seq2[3:5]) 821 self.assertGreater(b"TT", seq1[30:32]) 822 self.assertGreater(b"TT", seq2[3:5]) 823 self.assertGreater(Seq("TT"), seq1[30:32]) 824 self.assertGreater(Seq("TT"), seq2[3:5]) 825 self.assertGreater(MutableSeq("TT"), seq1[30:32]) 826 self.assertGreater(MutableSeq("TT"), seq2[3:5]) 827 828 def test_ge(self): 829 seq1 = self.item.seq 830 seq2 = self.item2.seq 831 self.assertGreaterEqual(seq2, seq1) 832 self.assertGreaterEqual("CC", seq1) 833 self.assertGreaterEqual(seq2, "CC") 834 self.assertGreaterEqual(b"CC", seq1) 835 self.assertGreaterEqual(seq2, b"CC") 836 self.assertGreaterEqual(Seq("CC"), seq1) 837 self.assertGreaterEqual(seq2, Seq("CC")) 838 self.assertGreaterEqual(MutableSeq("CC"), seq1) 839 self.assertGreaterEqual(seq2, MutableSeq("CC")) 840 self.assertGreaterEqual(seq1, "AA") 841 self.assertGreaterEqual(seq2, "AA") 842 self.assertGreaterEqual(seq1, b"AA") 843 self.assertGreaterEqual(seq2, b"AA") 844 self.assertGreaterEqual(seq1, Seq("AA")) 845 self.assertGreaterEqual(seq2, Seq("AA")) 846 self.assertGreaterEqual(seq1, MutableSeq("AA")) 847 self.assertGreaterEqual(seq2, MutableSeq("AA")) 848 self.assertGreaterEqual("TT", seq1) 849 self.assertGreaterEqual("TT", seq2) 850 self.assertGreaterEqual(b"TT", seq1) 851 self.assertGreaterEqual(b"TT", seq2) 852 self.assertGreaterEqual(Seq("TT"), seq1) 853 self.assertGreaterEqual(Seq("TT"), seq2) 854 self.assertGreaterEqual(MutableSeq("TT"), seq1) 855 self.assertGreaterEqual(MutableSeq("TT"), seq2) 856 with self.assertRaises(UndefinedSequenceError): 857 seq1 < Seq(None, len(seq1)) 858 with self.assertRaises(UndefinedSequenceError): 859 seq2 < Seq(None, len(seq2)) 860 with self.assertRaises(UndefinedSequenceError): 861 seq1 < Seq(None, 10) 862 with self.assertRaises(UndefinedSequenceError): 863 seq2 < Seq(None, 10) 864 self.assertGreaterEqual(seq1[30:32], "AA") 865 self.assertGreaterEqual(seq2[3:5], "AA") 866 self.assertGreaterEqual(seq1[30:32], b"AA") 867 self.assertGreaterEqual(seq2[3:5], b"AA") 868 self.assertGreaterEqual(seq1[30:34], seq2[3:5]) 869 self.assertGreaterEqual(seq1[30:32], Seq("AA")) 870 self.assertGreaterEqual(seq2[3:5], Seq("AA")) 871 self.assertGreaterEqual(seq1[30:32], MutableSeq("AA")) 872 self.assertGreaterEqual(seq2[3:5], MutableSeq("AA")) 873 self.assertGreaterEqual("TT", seq1[30:32]) 874 self.assertGreaterEqual("TT", seq2[3:5]) 875 self.assertGreaterEqual(b"TT", seq1[30:32]) 876 self.assertGreaterEqual(b"TT", seq2[3:5]) 877 self.assertGreaterEqual(Seq("TT"), seq1[30:32]) 878 self.assertGreaterEqual(Seq("TT"), seq2[3:5]) 879 self.assertGreaterEqual(MutableSeq("TT"), seq1[30:32]) 880 self.assertGreaterEqual(MutableSeq("TT"), seq2[3:5]) 881 882 883class LoaderTest(unittest.TestCase): 884 """Load a database from a GenBank file.""" 885 886 def setUp(self): 887 # create TESTDB 888 TESTDB = create_database() 889 890 # load the database 891 db_name = "biosql-test" 892 self.server = BioSeqDatabase.open_database( 893 driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB 894 ) 895 896 # remove the database if it already exists 897 try: 898 self.server[db_name] 899 self.server.remove_database(db_name) 900 except KeyError: 901 pass 902 903 self.db = self.server.new_database(db_name) 904 905 # get the GenBank file we are going to put into it 906 self.iterator = SeqIO.parse("GenBank/cor6_6.gb", "gb") 907 908 def tearDown(self): 909 self.server.close() 910 destroy_database() 911 del self.db 912 del self.server 913 914 def test_load_database(self): 915 """Load SeqRecord objects into a BioSQL database.""" 916 self.db.load(self.iterator) 917 918 # do some simple tests to make sure we actually loaded the right 919 # thing. More advanced tests in a different module. 920 items = list(self.db.values()) 921 self.assertEqual(len(items), 6) 922 self.assertEqual(len(self.db), 6) 923 item_names = [] 924 item_ids = [] 925 for item in items: 926 item_names.append(item.name) 927 item_ids.append(item.id) 928 item_names.sort() 929 item_ids.sort() 930 self.assertEqual( 931 item_names, 932 ["AF297471", "ARU237582", "ATCOR66M", "ATKIN2", "BNAKINI", "BRRBIF72"], 933 ) 934 self.assertEqual( 935 item_ids, 936 [ 937 "AF297471.1", 938 "AJ237582.1", 939 "L31939.1", 940 "M81224.1", 941 "X55053.1", 942 "X62281.1", 943 ], 944 ) 945 946 947class DeleteTest(unittest.TestCase): 948 """Test proper deletion of entries from a database.""" 949 950 loaded_db = 0 951 952 def setUp(self): 953 """Connect to and load up the database.""" 954 load_database("GenBank/cor6_6.gb") 955 956 self.server = BioSeqDatabase.open_database( 957 driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB 958 ) 959 960 self.db = self.server["biosql-test"] 961 962 def tearDown(self): 963 self.server.close() 964 destroy_database() 965 del self.db 966 del self.server 967 968 def test_server(self): 969 """Check BioSeqDatabase methods.""" 970 server = self.server 971 self.assertIn("biosql-test", server) 972 self.assertEqual(1, len(server)) 973 self.assertEqual(["biosql-test"], list(server.keys())) 974 # Check we can delete the namespace... 975 del server["biosql-test"] 976 self.assertEqual(0, len(server)) 977 with self.assertRaises(KeyError): 978 del server["non-existant-name"] 979 980 def test_del_db_items(self): 981 """Check all associated data is deleted from an item.""" 982 db = self.db 983 items = list(db.values()) 984 keys = list(db) 985 length = len(items) 986 987 for seq_id in keys: 988 sql = "SELECT seqfeature_id from seqfeature where bioentry_id = '%s'" 989 # get the original number of seqfeatures associated with the bioentry 990 seqfeatures = self.db.adaptor.execute_and_fetchall(sql % (seq_id)) 991 992 del db[seq_id] 993 # check to see that the entry in the bioentry table is removed 994 self.assertNotIn(seq_id, db) 995 996 # no need to check seqfeature presence if it had none to begin with 997 if len(seqfeatures): 998 rows_d = self.db.adaptor.execute_and_fetchall(sql % (seq_id)) 999 # check to see that associated data is removed 1000 self.assertEqual(len(rows_d), 0) 1001 1002 self.assertEqual(0, len(list(db.values()))) 1003 1004 1005class DupLoadTest(unittest.TestCase): 1006 """Check a few duplicate conditions fail.""" 1007 1008 def setUp(self): 1009 # drop any old database and create a new one: 1010 TESTDB = create_database() 1011 # connect to new database: 1012 self.server = BioSeqDatabase.open_database( 1013 driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB 1014 ) 1015 # Create new namespace within new empty database: 1016 self.db = self.server.new_database("biosql-test") 1017 1018 def tearDown(self): 1019 self.server.rollback() 1020 self.server.close() 1021 destroy_database() 1022 del self.db 1023 del self.server 1024 1025 def test_duplicate_load(self): 1026 """Make sure can't import a single record twice (in one go).""" 1027 record = SeqRecord( 1028 Seq("ATGCTATGACTAT"), id="Test1", annotations={"molecule_type": "DNA"} 1029 ) 1030 with self.assertRaises(Exception) as cm: 1031 self.db.load([record, record]) 1032 err = cm.exception 1033 # Note we check for a specific exception because the exception 1034 # class will depend on which DB back end is in use. 1035 self.assertIn( 1036 err.__class__.__name__, 1037 ["IntegrityError", "UniqueViolation", "AttributeError", "OperationalError"], 1038 ) 1039 1040 def test_duplicate_load2(self): 1041 """Make sure can't import a single record twice (in steps).""" 1042 record = SeqRecord( 1043 Seq("ATGCTATGACTAT"), id="Test2", annotations={"molecule_type": "DNA"} 1044 ) 1045 count = self.db.load([record]) 1046 self.assertEqual(count, 1) 1047 with self.assertRaises(Exception) as cm: 1048 self.db.load([record]) 1049 err = cm.exception 1050 # Note we check for a specific exception because the exception 1051 # class will depend on which DB back end is in use. 1052 self.assertIn( 1053 err.__class__.__name__, 1054 ["IntegrityError", "UniqueViolation", "AttributeError"], 1055 ) 1056 1057 def test_duplicate_id_load(self): 1058 """Make sure can't import records with same ID (in one go).""" 1059 record1 = SeqRecord( 1060 Seq("ATGCTATGACTAT"), id="TestA", annotations={"molecule_type": "DNA"} 1061 ) 1062 record2 = SeqRecord( 1063 Seq("GGGATGCGACTAT"), id="TestA", annotations={"molecule_type": "DNA"} 1064 ) 1065 with self.assertRaises(Exception) as cm: 1066 self.db.load([record1, record2]) 1067 err = cm.exception 1068 # Note we check for a specific exception because the exception 1069 # class will depend on which DB back end is in use. 1070 self.assertIn( 1071 err.__class__.__name__, 1072 ["IntegrityError", "UniqueViolation", "AttributeError"], 1073 ) 1074 1075 1076class ClosedLoopTest(SeqRecordTestBaseClass): 1077 """Test file -> BioSQL -> file.""" 1078 1079 @classmethod 1080 def setUpClass(cls): 1081 # NOTE - For speed I don't bother to create a new database each time, 1082 # simply a new unique namespace is used for each test. 1083 TESTDB = create_database() 1084 1085 def test_NC_005816(self): 1086 """From GenBank file to BioSQL and back to a GenBank file, NC_005816.""" 1087 with warnings.catch_warnings(): 1088 # BiopythonWarning: order location operators are not fully supported 1089 warnings.simplefilter("ignore", BiopythonWarning) 1090 self.loop("GenBank/NC_005816.gb", "gb") 1091 1092 def test_NC_000932(self): 1093 """From GenBank file to BioSQL and back to a GenBank file, NC_000932.""" 1094 self.loop("GenBank/NC_000932.gb", "gb") 1095 1096 def test_NT_019265(self): 1097 """From GenBank file to BioSQL and back to a GenBank file, NT_019265.""" 1098 self.loop("GenBank/NT_019265.gb", "gb") 1099 1100 def test_protein_refseq2(self): 1101 """From GenBank file to BioSQL and back to a GenBank file, protein_refseq2.""" 1102 with warnings.catch_warnings(): 1103 # BiopythonWarning: order location operators are not fully supported 1104 warnings.simplefilter("ignore", BiopythonWarning) 1105 self.loop("GenBank/protein_refseq2.gb", "gb") 1106 1107 def test_no_ref(self): 1108 """From GenBank file to BioSQL and back to a GenBank file, noref.""" 1109 self.loop("GenBank/noref.gb", "gb") 1110 1111 def test_one_of(self): 1112 """From GenBank file to BioSQL and back to a GenBank file, one_of.""" 1113 self.loop("GenBank/one_of.gb", "gb") 1114 1115 def test_cor6_6(self): 1116 """From GenBank file to BioSQL and back to a GenBank file, cor6_6.""" 1117 self.loop("GenBank/cor6_6.gb", "gb") 1118 1119 def test_arab1(self): 1120 """From GenBank file to BioSQL and back to a GenBank file, arab1.""" 1121 self.loop("GenBank/arab1.gb", "gb") 1122 1123 def loop(self, filename, format): 1124 original_records = [] 1125 for record in SeqIO.parse(filename, format): 1126 if "RNA" in record.annotations.get("molecule_type", ""): 1127 if "U" in record.seq: 1128 record.annotations["molecule_type"] = "RNA" 1129 else: 1130 record.annotations["molecule_type"] = "DNA" 1131 original_records.append(record) 1132 # now open a connection to load the database 1133 server = BioSeqDatabase.open_database( 1134 driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB 1135 ) 1136 db_name = "test_loop_%s" % filename # new namespace! 1137 db = server.new_database(db_name) 1138 count = db.load(original_records) 1139 self.assertEqual(count, len(original_records)) 1140 server.commit() 1141 # Now read them back... 1142 biosql_records = [db.lookup(name=rec.name) for rec in original_records] 1143 # And check they agree 1144 self.compare_records(original_records, biosql_records) 1145 # Now write to a handle... 1146 handle = StringIO() 1147 SeqIO.write(biosql_records, handle, "gb") 1148 # Now read them back... 1149 handle.seek(0) 1150 new_records = list(SeqIO.parse(handle, "gb")) 1151 # And check they still agree 1152 self.assertEqual(len(new_records), len(original_records)) 1153 for old, new in zip(original_records, new_records): 1154 # TODO - remove this hack because we don't yet write these (yet): 1155 for key in ["comment", "references"]: 1156 if key in old.annotations and key not in new.annotations: 1157 del old.annotations[key] 1158 self.compare_record(old, new) 1159 # Done 1160 handle.close() 1161 server.close() 1162 1163 1164class TransferTest(SeqRecordTestBaseClass): 1165 """Test file -> BioSQL, BioSQL -> BioSQL.""" 1166 1167 # NOTE - For speed I don't bother to create a new database each time, 1168 # simply a new unique namespace is used for each test. 1169 1170 def setUp(self): 1171 TESTDB = create_database() 1172 1173 def test_NC_005816(self): 1174 """From GenBank file to BioSQL, then again to a new namespace, NC_005816.""" 1175 with warnings.catch_warnings(): 1176 # BiopythonWarning: order location operators are not fully supported 1177 warnings.simplefilter("ignore", BiopythonWarning) 1178 self.trans("GenBank/NC_005816.gb", "gb") 1179 1180 def test_NC_000932(self): 1181 """From GenBank file to BioSQL, then again to a new namespace, NC_000932.""" 1182 self.trans("GenBank/NC_000932.gb", "gb") 1183 1184 def test_NT_019265(self): 1185 """From GenBank file to BioSQL, then again to a new namespace, NT_019265.""" 1186 self.trans("GenBank/NT_019265.gb", "gb") 1187 1188 def test_protein_refseq2(self): 1189 """From GenBank file to BioSQL, then again to a new namespace, protein_refseq2.""" 1190 with warnings.catch_warnings(): 1191 # BiopythonWarning: order location operators are not fully supported 1192 warnings.simplefilter("ignore", BiopythonWarning) 1193 self.trans("GenBank/protein_refseq2.gb", "gb") 1194 1195 def test_no_ref(self): 1196 """From GenBank file to BioSQL, then again to a new namespace, noref.""" 1197 self.trans("GenBank/noref.gb", "gb") 1198 1199 def test_one_of(self): 1200 """From GenBank file to BioSQL, then again to a new namespace, one_of.""" 1201 self.trans("GenBank/one_of.gb", "gb") 1202 1203 def test_cor6_6(self): 1204 """From GenBank file to BioSQL, then again to a new namespace, cor6_6.""" 1205 self.trans("GenBank/cor6_6.gb", "gb") 1206 1207 def test_arab1(self): 1208 """From GenBank file to BioSQL, then again to a new namespace, arab1.""" 1209 self.trans("GenBank/arab1.gb", "gb") 1210 1211 def trans(self, filename, format): 1212 original_records = [] 1213 for record in SeqIO.parse(filename, format): 1214 if record.annotations.get("molecule_type") == "mRNA": 1215 record.annotations["molecule_type"] = "DNA" 1216 original_records.append(record) 1217 # now open a connection to load the database 1218 server = BioSeqDatabase.open_database( 1219 driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB 1220 ) 1221 db_name = "test_trans1_%s" % filename # new namespace! 1222 db = server.new_database(db_name) 1223 count = db.load(original_records) 1224 self.assertEqual(count, len(original_records)) 1225 server.commit() 1226 # Now read them back... 1227 biosql_records = [db.lookup(name=rec.name) for rec in original_records] 1228 # And check they agree 1229 self.compare_records(original_records, biosql_records) 1230 # Now write to a second name space... 1231 db_name = "test_trans2_%s" % filename # new namespace! 1232 db = server.new_database(db_name) 1233 count = db.load(biosql_records) 1234 self.assertEqual(count, len(original_records)) 1235 # Now read them back again, 1236 biosql_records2 = [db.lookup(name=rec.name) for rec in original_records] 1237 # And check they also agree 1238 self.compare_records(original_records, biosql_records2) 1239 # Done 1240 server.close() 1241 1242 def tearDown(self): 1243 destroy_database() 1244 1245 1246class InDepthLoadTest(unittest.TestCase): 1247 """Make sure we are loading and retreiving in a semi-lossless fashion.""" 1248 1249 def setUp(self): 1250 gb_file = os.path.join(os.getcwd(), "GenBank", "cor6_6.gb") 1251 load_database(gb_file) 1252 1253 self.server = BioSeqDatabase.open_database( 1254 driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB 1255 ) 1256 self.db = self.server["biosql-test"] 1257 1258 def tearDown(self): 1259 self.server.close() 1260 destroy_database() 1261 del self.db 1262 del self.server 1263 1264 def test_transfer(self): 1265 """Make sure can load record into another namespace.""" 1266 # Should be in database already... 1267 db_record = self.db.lookup(accession="X55053") 1268 # Make a new namespace 1269 db2 = self.server.new_database("biosql-test-alt") 1270 # Should be able to load this DBSeqRecord there... 1271 count = db2.load([db_record]) 1272 self.assertEqual(count, 1) 1273 1274 def test_reload(self): 1275 """Make sure can't reimport existing records.""" 1276 gb_file = os.path.join(os.getcwd(), "GenBank", "cor6_6.gb") 1277 with open(gb_file) as gb_handle: 1278 record = next(SeqIO.parse(gb_handle, "gb")) 1279 # Should be in database already... 1280 db_record = self.db.lookup(accession="X55053") 1281 self.assertEqual(db_record.id, record.id) 1282 self.assertEqual(db_record.name, record.name) 1283 self.assertEqual(db_record.description, record.description) 1284 self.assertEqual(db_record.seq, record.seq) 1285 # Good... now try reloading it! 1286 with self.assertRaises(Exception) as cm: 1287 self.db.load([record]) 1288 err = cm.exception 1289 # Note we check for a specific exception because the exception 1290 # class will depend on which DB back end is in use. 1291 self.assertIn( 1292 err.__class__.__name__, 1293 ["IntegrityError", "UniqueViolation", "AttributeError"], 1294 ) 1295 1296 def test_record_loading(self): 1297 """Make sure all records are correctly loaded.""" 1298 test_record = self.db.lookup(accession="X55053") 1299 self.assertEqual(test_record.name, "ATCOR66M") 1300 self.assertEqual(test_record.id, "X55053.1") 1301 self.assertEqual(test_record.description, "A.thaliana cor6.6 mRNA") 1302 self.assertEqual(test_record.annotations["molecule_type"], "DNA") 1303 self.assertEqual(test_record.seq[:20], "AACAAAACACACATCAAAAA") 1304 1305 test_record = self.db.lookup(accession="X62281") 1306 self.assertEqual(test_record.name, "ATKIN2") 1307 self.assertEqual(test_record.id, "X62281.1") 1308 self.assertEqual(test_record.description, "A.thaliana kin2 gene") 1309 self.assertEqual(test_record.annotations["molecule_type"], "DNA") 1310 self.assertEqual(test_record.seq[:10], "ATTTGGCCTA") 1311 1312 def test_seq_feature(self): 1313 """In depth check that SeqFeatures are transmitted through the db.""" 1314 test_record = self.db.lookup(accession="AJ237582") 1315 features = test_record.features 1316 self.assertEqual(len(features), 7) 1317 1318 # test single locations 1319 test_feature = features[0] 1320 self.assertEqual(test_feature.type, "source") 1321 self.assertEqual(str(test_feature.location), "[0:206](+)") 1322 self.assertEqual(len(test_feature.qualifiers), 3) 1323 self.assertEqual(test_feature.qualifiers["country"], ["Russia:Bashkortostan"]) 1324 self.assertEqual(test_feature.qualifiers["organism"], ["Armoracia rusticana"]) 1325 self.assertEqual(test_feature.qualifiers["db_xref"], ["taxon:3704"]) 1326 1327 # test split locations 1328 test_feature = features[4] 1329 self.assertEqual(test_feature.type, "CDS") 1330 self.assertEqual(str(test_feature.location), "join{[0:48](+), [142:206](+)}") 1331 self.assertEqual(len(test_feature.location.parts), 2) 1332 self.assertEqual(str(test_feature.location.parts[0]), "[0:48](+)") 1333 self.assertEqual(str(test_feature.location.parts[1]), "[142:206](+)") 1334 self.assertEqual(test_feature.location.operator, "join") 1335 self.assertEqual(len(test_feature.qualifiers), 6) 1336 self.assertEqual(test_feature.qualifiers["gene"], ["csp14"]) 1337 self.assertEqual(test_feature.qualifiers["codon_start"], ["2"]) 1338 self.assertEqual(test_feature.qualifiers["product"], ["cold shock protein"]) 1339 self.assertEqual(test_feature.qualifiers["protein_id"], ["CAB39890.1"]) 1340 self.assertEqual(test_feature.qualifiers["db_xref"], ["GI:4538893"]) 1341 self.assertEqual( 1342 test_feature.qualifiers["translation"], 1343 ["DKAKDAAAAAGASAQQAGKNISDAAAGGVNFVKEKTG"], 1344 ) 1345 1346 # test passing strand information 1347 # XXX We should be testing complement as well 1348 test_record = self.db.lookup(accession="AJ237582") 1349 test_feature = test_record.features[4] # DNA, no complement 1350 self.assertEqual(test_feature.strand, 1) 1351 for loc in test_feature.location.parts: 1352 self.assertEqual(loc.strand, 1) 1353 1354 test_record = self.db.lookup(accession="X55053") 1355 test_feature = test_record.features[0] 1356 # mRNA, so really cDNA, so the strand should be 1 (not complemented) 1357 self.assertEqual(test_feature.strand, 1) 1358 1359 1360##################################################################### 1361 1362 1363class AutoSeqIOTests(SeqRecordTestBaseClass): 1364 """Test SeqIO and BioSQL together.""" 1365 1366 server = None 1367 db = None 1368 1369 @classmethod 1370 def setUpClass(cls): 1371 # Create and reuse on database for all tests in this class 1372 TESTDB = create_database() 1373 1374 def setUp(self): 1375 """Connect to the database.""" 1376 db_name = "biosql-test-seqio" 1377 server = BioSeqDatabase.open_database( 1378 driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB 1379 ) 1380 self.server = server 1381 if db_name not in server: 1382 self.db = server.new_database(db_name) 1383 server.commit() 1384 self.db = self.server[db_name] 1385 1386 def tearDown(self): 1387 if self.db: 1388 del self.db 1389 if self.server: 1390 self.server.close() 1391 del self.server 1392 1393 def check(self, t_format, t_filename, t_count=1): 1394 db = self.db 1395 1396 records = [] 1397 for record in SeqIO.parse(t_filename, t_format): 1398 molecule_type = record.annotations.get("molecule_type") 1399 if molecule_type is not None: 1400 if "DNA" in molecule_type: 1401 record.annotations["molecule_type"] = "DNA" 1402 elif "RNA" in molecule_type: 1403 record.annotations["molecule_type"] = "RNA" 1404 elif "protein" in molecule_type: 1405 record.annotations["molecule_type"] = "protein" 1406 else: 1407 raise Exception("Unknown molecule type '%s'" % molecule_type) 1408 records.append(record) 1409 count = db.load(records) 1410 assert count == t_count 1411 self.server.commit() 1412 1413 for record in records: 1414 key = record.name 1415 # print(" - Retrieving by name/display_id '%s'," % key) 1416 db_rec = db.lookup(name=key) 1417 self.compare_record(record, db_rec) 1418 db_rec = db.lookup(display_id=key) 1419 self.compare_record(record, db_rec) 1420 1421 key = record.id 1422 if key.count(".") == 1 and key.split(".")[1].isdigit(): 1423 # print(" - Retrieving by version '%s'," % key) 1424 db_rec = db.lookup(version=key) 1425 self.compare_record(record, db_rec) 1426 1427 if "accessions" in record.annotations: 1428 # Only expect FIRST accession to work! 1429 key = record.annotations["accessions"][0] 1430 assert key, "Blank accession in annotation %r" % record.annotations 1431 if key != record.id: 1432 # print(" - Retrieving by accession '%s'," % key) 1433 db_rec = db.lookup(accession=key) 1434 self.compare_record(record, db_rec) 1435 1436 if "gi" in record.annotations: 1437 key = record.annotations["gi"] 1438 if key != record.id: 1439 # print(" - Retrieving by GI '%s'," % key) 1440 db_rec = db.lookup(primary_id=key) 1441 self.compare_record(record, db_rec) 1442 1443 def test_SeqIO_loading(self): 1444 self.check("fasta", "Fasta/lupine.nu") 1445 self.check("fasta", "Fasta/elderberry.nu") 1446 self.check("fasta", "Fasta/phlox.nu") 1447 self.check("fasta", "Fasta/centaurea.nu") 1448 self.check("fasta", "Fasta/wisteria.nu") 1449 self.check("fasta", "Fasta/sweetpea.nu") 1450 self.check("fasta", "Fasta/lavender.nu") 1451 self.check("fasta", "Fasta/aster.pro") 1452 self.check("fasta", "Fasta/loveliesbleeding.pro") 1453 self.check("fasta", "Fasta/rose.pro") 1454 self.check("fasta", "Fasta/rosemary.pro") 1455 self.check("fasta", "Fasta/f001") 1456 self.check("fasta", "Fasta/f002", 3) 1457 self.check("fasta", "Fasta/fa01", 2) 1458 self.check("fasta", "GFF/NC_001802.fna") 1459 self.check("fasta", "GFF/multi.fna", 3) 1460 self.check("fasta", "Registry/seqs.fasta", 2) 1461 self.check("swiss", "SwissProt/sp001") 1462 self.check("swiss", "SwissProt/sp002") 1463 self.check("swiss", "SwissProt/sp003") 1464 self.check("swiss", "SwissProt/P0A186.txt") 1465 self.check("swiss", "SwissProt/sp005") 1466 self.check("swiss", "SwissProt/sp006") 1467 self.check("swiss", "SwissProt/sp007") 1468 self.check("swiss", "SwissProt/sp008") 1469 self.check("swiss", "SwissProt/sp009") 1470 self.check("swiss", "SwissProt/sp010") 1471 self.check("swiss", "SwissProt/sp011") 1472 self.check("swiss", "SwissProt/sp012") 1473 self.check("swiss", "SwissProt/sp013") 1474 self.check("swiss", "SwissProt/P60137.txt") 1475 self.check("swiss", "SwissProt/sp015") 1476 self.check("swiss", "SwissProt/sp016") 1477 self.check("swiss", "Registry/EDD_RAT.dat") 1478 self.check("genbank", "GenBank/noref.gb") 1479 self.check("genbank", "GenBank/cor6_6.gb", 6) 1480 self.check("genbank", "GenBank/iro.gb") 1481 self.check("genbank", "GenBank/pri1.gb") 1482 self.check("genbank", "GenBank/arab1.gb") 1483 with warnings.catch_warnings(): 1484 # BiopythonWarning: order location operators are not fully 1485 # supported 1486 warnings.simplefilter("ignore", BiopythonWarning) 1487 self.check("genbank", "GenBank/protein_refseq2.gb") 1488 self.check("genbank", "GenBank/extra_keywords.gb") 1489 self.check("genbank", "GenBank/one_of.gb") 1490 self.check("genbank", "GenBank/NT_019265.gb") 1491 self.check("genbank", "GenBank/origin_line.gb") 1492 self.check("genbank", "GenBank/blank_seq.gb") 1493 with warnings.catch_warnings(): 1494 # BiopythonWarning: bond location operators are not fully supported 1495 warnings.simplefilter("ignore", BiopythonWarning) 1496 self.check("genbank", "GenBank/dbsource_wrap.gb") 1497 # BiopythonWarning: order location operators are not fully 1498 # supported 1499 self.check("genbank", "GenBank/NC_005816.gb") 1500 self.check("genbank", "GenBank/gbvrl1_start.seq", 3) 1501 self.check("genbank", "GFF/NC_001422.gbk") 1502 self.check("embl", "EMBL/TRBG361.embl") 1503 self.check("embl", "EMBL/DD231055_edited.embl") 1504 self.check("embl", "EMBL/SC10H5.embl") 1505 self.check("embl", "EMBL/U87107.embl") 1506 self.assertEqual(len(self.db), 66) 1507 1508 1509class SwissProtUnknownPositionTest(unittest.TestCase): 1510 """Handle SwissProt unknown position by setting value to null in database.""" 1511 1512 def setUp(self): 1513 # drop any old database and create a new one: 1514 TESTDB = create_database() 1515 # connect to new database: 1516 self.server = BioSeqDatabase.open_database( 1517 driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB 1518 ) 1519 # Create new namespace within new empty database: 1520 self.db = self.server.new_database("biosql-test") 1521 1522 def tearDown(self): 1523 self.server.rollback() 1524 self.server.close() 1525 destroy_database() 1526 del self.db 1527 del self.server 1528 1529 def test_ambiguous_location(self): 1530 """Loaded uniprot-xml with ambiguous location in BioSQL.""" 1531 id = "P97881" 1532 seqiter = SeqIO.parse("SwissProt/%s.xml" % id, "uniprot-xml") 1533 self.assertEqual(self.db.load(seqiter), 1) 1534 1535 dbrecord = self.db.lookup(primary_id=id) 1536 for feature in dbrecord.features: 1537 if feature.type == "signal peptide": 1538 self.assertIsInstance(feature.location.end, UnknownPosition) 1539 elif feature.type == "chain": 1540 self.assertIsInstance(feature.location.start, UnknownPosition) 1541 else: 1542 self.assertIsInstance(feature.location.start, ExactPosition) 1543 1544 1545class TestBaseClassMethods(unittest.TestCase): 1546 """Test if methods from the Bio.Seq base class are called correctly.""" 1547 1548 def setUp(self): 1549 """Load a database.""" 1550 path = "GenBank/cor6_6.gb" 1551 accession = "X62281" 1552 load_database(path) 1553 1554 self.server = BioSeqDatabase.open_database( 1555 driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB 1556 ) 1557 self.db = self.server["biosql-test"] 1558 self.seq1 = self.db.lookup(accession=accession).seq 1559 records = SeqIO.parse(path, "genbank") 1560 for record in records: 1561 if accession in record.annotations["accessions"]: 1562 break 1563 else: 1564 raise RuntimeError( 1565 "Failed to find accession %s in GenBank file" % accession 1566 ) 1567 self.seq2 = record.seq 1568 1569 def tearDown(self): 1570 self.server.close() 1571 destroy_database() 1572 del self.db 1573 del self.seq1 1574 del self.seq2 1575 del self.server 1576 1577 def test_bytes(self): 1578 b = bytes(self.seq1) 1579 self.assertIsInstance(b, bytes) 1580 self.assertEqual(len(b), 880) 1581 self.assertEqual(b, bytes(self.seq2)) 1582 1583 def test_hash(self): 1584 self.assertEqual(hash(self.seq1), hash(self.seq2)) 1585 1586 def test_add(self): 1587 self.assertIsInstance(self.seq1 + "ABCD", Seq) 1588 self.assertEqual(self.seq1 + "ABCD", self.seq2 + "ABCD") 1589 1590 def test_radd(self): 1591 self.assertIsInstance("ABCD" + self.seq1, Seq) 1592 self.assertEqual("ABCD" + self.seq1, "ABCD" + self.seq2) 1593 1594 def test_mul(self): 1595 self.assertIsInstance(2 * self.seq1, Seq) 1596 self.assertEqual(2 * self.seq1, 2 * self.seq2) 1597 self.assertIsInstance(self.seq1 * 2, Seq) 1598 self.assertEqual(self.seq1 * 2, self.seq2 * 2) 1599 1600 def test_contains(self): 1601 for seq in (self.seq1, self.seq2): 1602 self.assertIn("CCTTAAGCCCA", seq) 1603 self.assertNotIn("ACGTACGT", seq) 1604 1605 def test_repr(self): 1606 self.assertIsInstance(repr(self.seq1), str) 1607 self.assertEqual(repr(self.seq1), repr(self.seq2)) 1608 1609 def test_str(self): 1610 self.assertIsInstance(str(self.seq1), str) 1611 self.assertEqual(str(self.seq1), str(self.seq2)) 1612 1613 def test_count(self): 1614 self.assertEqual(self.seq1.count("CT"), self.seq2.count("CT")) 1615 self.assertEqual(self.seq1.count("CT", 75), self.seq2.count("CT", 75)) 1616 self.assertEqual( 1617 self.seq1.count("CT", 125, 250), self.seq2.count("CT", 125, 250) 1618 ) 1619 1620 def test_find(self): 1621 self.assertEqual(self.seq1.find("CT"), self.seq2.find("CT")) 1622 self.assertEqual(self.seq1.find("CT", 75), self.seq2.find("CT", 75)) 1623 self.assertEqual(self.seq1.find("CG", 75, 100), self.seq2.find("CG", 75, 100)) 1624 self.assertEqual( 1625 self.seq1.find("CT", None, 100), self.seq2.find("CT", None, 100) 1626 ) 1627 1628 def test_rfind(self): 1629 self.assertEqual(self.seq1.rfind("CT"), self.seq2.rfind("CT")) 1630 self.assertEqual(self.seq1.rfind("CT", 450), self.seq2.rfind("CT", 450)) 1631 self.assertEqual( 1632 self.seq1.rfind("CT", None, 100), self.seq2.rfind("CT", None, 100) 1633 ) 1634 self.assertEqual(self.seq1.rfind("CT", 75, 100), self.seq2.rfind("CT", 75, 100)) 1635 1636 def test_index(self): 1637 self.assertEqual(self.seq1.index("CT"), self.seq2.index("CT")) 1638 self.assertEqual(self.seq1.index("CT", 75), self.seq2.index("CT", 75)) 1639 self.assertEqual( 1640 self.seq1.index("CT", None, 100), self.seq2.index("CT", None, 100) 1641 ) 1642 for seq in (self.seq1, self.seq2): 1643 self.assertRaises(ValueError, seq.index, "CG", 75, 100) 1644 self.assertRaises(ValueError, seq.index, "CG", 75, 100) 1645 1646 def test_rindex(self): 1647 self.assertEqual(self.seq1.rindex("CT"), self.seq2.rindex("CT")) 1648 self.assertEqual( 1649 self.seq1.rindex("CT", None, 100), self.seq2.rindex("CT", None, 100) 1650 ) 1651 for seq in (self.seq1, self.seq2): 1652 self.assertRaises(ValueError, seq.rindex, "AG", 850) 1653 self.assertRaises(ValueError, seq.rindex, "CG", 75, 100) 1654 1655 def test_startswith(self): 1656 for seq in (self.seq1, self.seq2): 1657 self.assertTrue(seq.startswith("ATTT")) 1658 self.assertTrue(seq.startswith("TAAA", start=10)) 1659 self.assertTrue(seq.startswith("TAAA", start=10, end=14)) 1660 self.assertFalse(seq.startswith("TAAA", start=10, end=12)) 1661 1662 def test_endswith(self): 1663 for seq in (self.seq1, self.seq2): 1664 self.assertTrue(seq.endswith("TATA")) 1665 self.assertTrue(seq.endswith("TATA", 876)) 1666 self.assertTrue(seq.endswith("ATTA", 872, 878)) 1667 self.assertFalse(seq.endswith("ATTA", 876, 878)) 1668 1669 def test_split(self): 1670 self.assertEqual(self.seq1.split(), self.seq2.split()) 1671 self.assertEqual(self.seq1.split("C"), self.seq2.split("C")) 1672 self.assertEqual(self.seq1.split("C", 1), self.seq2.split("C", 1)) 1673 1674 def test_rsplit(self): 1675 self.assertEqual(self.seq1.rsplit(), self.seq2.rsplit()) 1676 self.assertEqual(self.seq1.rsplit("C"), self.seq2.rsplit("C")) 1677 self.assertEqual(self.seq1.rsplit("C", 1), self.seq2.rsplit("C", 1)) 1678 1679 def test_strip(self): 1680 self.assertEqual(self.seq1.strip("G"), self.seq2.strip("G")) 1681 1682 def test_lstrip(self, chars=None): 1683 self.assertEqual(self.seq1.lstrip("G"), self.seq2.lstrip("G")) 1684 1685 def test_rstrip(self, chars=None): 1686 self.assertEqual(self.seq1.rstrip("G"), self.seq2.rstrip("G")) 1687 1688 def test_upper(self): 1689 self.assertEqual(self.seq1.upper(), self.seq2.upper()) 1690 1691 def test_lower(self): 1692 self.assertEqual(self.seq1.lower(), self.seq2.lower()) 1693 1694 def test_replace(self): 1695 # seq.transcribe uses seq._data.replace 1696 self.assertEqual(self.seq1.transcribe(), self.seq2.transcribe()) 1697 1698 def test_translate(self): 1699 # seq.complement uses seq._data.translate 1700 self.assertEqual(self.seq1.complement(), self.seq2.complement()) 1701