1# This code is part of the Biopython distribution and governed by its
2# license.  Please see the LICENSE file that should have been included
3# as part of this package.
4"""Dealing with storage of biopython objects in a BioSQL relational db."""
5
6import configparser
7import os
8import platform
9import tempfile
10import time
11import unittest
12
13from io import StringIO
14
15# Hide annoying warnings from things like bonds in GenBank features,
16# or PostgreSQL schema rules. TODO - test these warnings are raised!
17import warnings
18from Bio import BiopythonWarning
19
20# local stuff
21from Bio import MissingExternalDependencyError
22from Bio.Seq import Seq, MutableSeq, UndefinedSequenceError
23from Bio.SeqFeature import SeqFeature, UnknownPosition, ExactPosition
24from Bio import SeqIO
25from Bio.SeqRecord import SeqRecord
26
27from BioSQL import BioSeqDatabase
28from BioSQL import BioSeq
29
30from seq_tests_common import SeqRecordTestBaseClass
31
32if __name__ == "__main__":
33    raise RuntimeError("Call this via test_BioSQL_*.py not directly")
34
35# Exporting these to the test_BioSQL_XXX.py files which import this file:
36# DBDRIVER, DBTYPE, DBHOST, DBUSER, DBPASSWD, TESTDB, DBSCHEMA, SQL_FILE, SYSTEM
37
38SYSTEM = platform.system()
39
40
41def load_biosql_ini(DBTYPE):
42    """Load the database settings from INI file."""
43    if not os.path.isfile("biosql.ini"):
44        raise MissingExternalDependencyError(
45            "BioSQL test configuration file biosql.ini missing (see biosql.ini.sample)"
46        )
47
48    config = configparser.ConfigParser()
49    config.read("biosql.ini")
50    DBHOST = config.get(DBTYPE, "dbhost")
51    DBUSER = config.get(DBTYPE, "dbuser")
52    DBPASSWD = config.get(DBTYPE, "dbpasswd")
53    TESTDB = config.get(DBTYPE, "testdb")
54    return DBHOST, DBUSER, DBPASSWD, TESTDB
55
56
57def temp_db_filename():
58    """Generate a temporary filename for SQLite database."""
59    # In memory SQLite does not work with current test structure since the tests
60    # expect databases to be retained between individual tests.
61    # TESTDB = ':memory:'
62    # Instead, we use (if we can) /dev/shm
63    try:
64        h, test_db_fname = tempfile.mkstemp("_BioSQL.db", dir="/dev/shm")
65    except OSError:
66        # We can't use /dev/shm
67        h, test_db_fname = tempfile.mkstemp("_BioSQL.db")
68    os.close(h)
69    return test_db_fname
70
71
72def check_config(dbdriver, dbtype, dbhost, dbuser, dbpasswd, testdb):
73    """Verify the database settings work for connecting."""
74    global DBDRIVER, DBTYPE, DBHOST, DBUSER, DBPASSWD, TESTDB, DBSCHEMA
75    global SYSTEM, SQL_FILE
76    DBDRIVER = dbdriver
77    DBTYPE = dbtype
78    DBHOST = dbhost
79    DBUSER = dbuser
80    DBPASSWD = dbpasswd
81    TESTDB = testdb
82
83    if not DBDRIVER or not DBTYPE or not DBUSER:
84        # No point going any further...
85        raise MissingExternalDependencyError("Incomplete BioSQL test settings")
86
87    # Check the database driver is installed:
88    if SYSTEM == "Java":
89        try:
90            if DBDRIVER in ["MySQLdb"]:
91                import com.mysql.jdbc.Driver
92            elif DBDRIVER in ["psycopg2", "pgdb"]:
93                import org.postgresql.Driver
94        except ImportError:
95            message = "Install the JDBC driver for %s to use BioSQL " % DBTYPE
96            raise MissingExternalDependencyError(message) from None
97    else:
98        try:
99            __import__(DBDRIVER)
100        except ImportError:
101            if DBDRIVER in ["MySQLdb"]:
102                message = (
103                    "Install MySQLdb or mysqlclient if you want to use %s with BioSQL "
104                    % (DBTYPE)
105                )
106            else:
107                message = "Install %s if you want to use %s with BioSQL " % (
108                    DBDRIVER,
109                    DBTYPE,
110                )
111            raise MissingExternalDependencyError(message) from None
112
113    try:
114        if DBDRIVER in ["sqlite3"]:
115            server = BioSeqDatabase.open_database(driver=DBDRIVER, db=TESTDB)
116        else:
117            server = BioSeqDatabase.open_database(
118                driver=DBDRIVER, host=DBHOST, user=DBUSER, passwd=DBPASSWD
119            )
120        server.close()
121        del server
122    except Exception as e:
123        message = "Connection failed, check settings if you plan to use BioSQL: %s" % e
124        raise MissingExternalDependencyError(message) from None
125
126    DBSCHEMA = "biosqldb-" + DBTYPE + ".sql"
127    SQL_FILE = os.path.join(os.getcwd(), "BioSQL", DBSCHEMA)
128
129    if not os.path.isfile(SQL_FILE):
130        message = "Missing SQL schema file: %s" % SQL_FILE
131        raise MissingExternalDependencyError(message)
132
133
134def _do_db_cleanup():
135    """Cleanup everything from TESTDB.
136
137    Relevant for MySQL and PostgreSQL.
138    """
139    if DBDRIVER in ["psycopg2", "pgdb"]:
140        # first open a connection the database
141        # notice that postgres doesn't have createdb privileges, so
142        # the TESTDB must exist
143        server = BioSeqDatabase.open_database(
144            driver=DBDRIVER, host=DBHOST, user=DBUSER, passwd=DBPASSWD, db=TESTDB
145        )
146
147        # The pgdb postgres driver does not support autocommit, so here we
148        # commit the current transaction so that 'drop database' query will
149        # be outside a transaction block
150        server.adaptor.cursor.execute("COMMIT")
151        # drop anything in the database
152        # with Postgres, can get errors about database still being used.
153        # Wait briefly to be sure previous tests are done with it.
154        time.sleep(1)
155        # drop anything in the database
156        sql = r"DROP OWNED BY " + DBUSER
157        server.adaptor.cursor.execute(sql, ())
158        server.close()
159    else:
160        # first open a connection to create the database
161        server = BioSeqDatabase.open_database(
162            driver=DBDRIVER, host=DBHOST, user=DBUSER, passwd=DBPASSWD
163        )
164        # Auto-commit
165        try:
166            server.adaptor.autocommit()
167        except AttributeError:
168            pass
169        # drop the database
170        try:
171            sql = r"DROP DATABASE " + TESTDB
172            server.adaptor.cursor.execute(sql, ())
173        except (
174            server.module.OperationalError,
175            server.module.Error,
176            server.module.DatabaseError,
177        ) as e:  # the database doesn't exist
178            pass
179        except (
180            server.module.IntegrityError,
181            server.module.ProgrammingError,
182        ) as e:  # ditto--perhaps
183            if str(e).find('database "%s" does not exist' % TESTDB) == -1:
184                server.close()
185                raise
186        # create a new database
187        sql = r"CREATE DATABASE " + TESTDB
188        server.adaptor.execute(sql, ())
189        server.close()
190
191
192def create_database():
193    """Delete any existing BioSQL test DB, then (re)create an empty BioSQL DB.
194
195    Returns TESTDB name which will change for for SQLite.
196    """
197    if DBDRIVER in ["sqlite3"]:
198        global TESTDB
199        if os.path.exists(TESTDB):
200            try:
201                os.remove(TESTDB)
202            except Exception:
203                time.sleep(1)
204                try:
205                    os.remove(TESTDB)
206                except Exception:
207                    # Seen this with PyPy 2.1 (and older) on Windows -
208                    # which suggests an open handle still exists?
209                    print("Could not remove %r" % TESTDB)
210                    pass
211        # Now pick a new filename - just in case there is a stale handle
212        # (which might be happening under Windows...)
213        TESTDB = temp_db_filename()
214    else:
215        _do_db_cleanup()
216
217    # now open a connection to load the database
218    server = BioSeqDatabase.open_database(
219        driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB
220    )
221    try:
222        server.load_database_sql(SQL_FILE)
223        server.commit()
224        server.close()
225    except Exception:
226        # Failed, but must close the handle...
227        server.close()
228        raise
229
230    return TESTDB
231
232
233def destroy_database():
234    """Delete any temporary BioSQL sqlite3 database files."""
235    if DBDRIVER in ["sqlite3"]:
236        if os.path.exists(TESTDB):
237            os.remove(TESTDB)
238
239
240def load_database(gb_filename_or_handle):
241    """Load a GenBank file into a new BioSQL database.
242
243    This is useful for running tests against a newly created database.
244    """
245    TESTDB = create_database()
246    # now open a connection to load the database
247    db_name = "biosql-test"
248    server = BioSeqDatabase.open_database(
249        driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB
250    )
251    db = server.new_database(db_name)
252
253    # get the GenBank file we are going to put into it
254    iterator = SeqIO.parse(gb_filename_or_handle, "gb")
255    records = []
256    for record in iterator:
257        if record.annotations.get("molecule_type") == "mRNA":
258            record.annotations["molecule_type"] = "DNA"
259        records.append(record)
260    # finally put it in the database
261    count = db.load(records)
262    server.commit()
263    server.close()
264    return count
265
266
267def load_multi_database(gb_filename_or_handle, gb_filename_or_handle2):
268    """Load two GenBank files into a new BioSQL database as different subdatabases.
269
270    This is useful for running tests against a newly created database.
271    """
272    TESTDB = create_database()
273    # now open a connection to load the database
274    db_name = "biosql-test"
275    db_name2 = "biosql-test2"
276    server = BioSeqDatabase.open_database(
277        driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB
278    )
279    db = server.new_database(db_name)
280
281    # get the GenBank file we are going to put into it
282    iterator = SeqIO.parse(gb_filename_or_handle, "gb")
283    count = db.load(iterator)
284
285    db = server.new_database(db_name2)
286
287    # get the GenBank file we are going to put into it
288    iterator = SeqIO.parse(gb_filename_or_handle2, "gb")
289    # finally put it in the database
290    count2 = db.load(iterator)
291    server.commit()
292
293    server.close()
294    return count + count2
295
296
297class MultiReadTest(unittest.TestCase):
298    """Test reading a database with multiple namespaces."""
299
300    loaded_db = 0
301
302    def setUp(self):
303        """Connect to and load up the database."""
304        load_multi_database("GenBank/cor6_6.gb", "GenBank/NC_000932.gb")
305
306        self.server = BioSeqDatabase.open_database(
307            driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB
308        )
309
310        self.db = self.server["biosql-test"]
311        self.db2 = self.server["biosql-test2"]
312
313    def tearDown(self):
314        self.server.close()
315        destroy_database()
316        del self.db
317        del self.db2
318        del self.server
319
320    def test_server(self):
321        """Check BioSeqDatabase methods."""
322        server = self.server
323        self.assertIn("biosql-test", server)
324        self.assertIn("biosql-test2", server)
325        self.assertEqual(2, len(server))
326        self.assertEqual(["biosql-test", "biosql-test2"], list(server.keys()))
327        # Check we can delete the namespace...
328        del server["biosql-test"]
329        del server["biosql-test2"]
330        self.assertEqual(0, len(server))
331        with self.assertRaises(KeyError):
332            del server["non-existant-name"]
333
334    def test_get_db_items(self):
335        """Check list, keys, length etc."""
336        db = self.db
337        items = list(db.values())
338        keys = list(db)
339        length = len(items)
340        self.assertEqual(length, len(db))
341        self.assertEqual(length, len(list(db)))
342        self.assertEqual(length, len(list(db.items())))
343        self.assertEqual(length, len(list(db.keys())))
344        self.assertEqual(length, len(list(db.values())))
345        for (k1, r1), (k2, r2) in zip(zip(keys, items), db.items()):
346            self.assertEqual(k1, k2)
347            self.assertEqual(r1.id, r2.id)
348        for k in keys:
349            del db[k]
350        self.assertEqual(0, len(db))
351        with self.assertRaises(KeyError):
352            del db["non-existant-name"]
353
354    def test_cross_retrieval_of_items(self):
355        """Test that valid ids can't be retrieved between namespaces."""
356        db = self.db
357        db2 = self.db2
358        for db2_id in db2.keys():
359            with self.assertRaises(KeyError):
360                rec = db[db2_id]
361
362
363class ReadTest(unittest.TestCase):
364    """Test reading a database from an already built database."""
365
366    loaded_db = 0
367
368    def setUp(self):
369        """Connect to and load up the database."""
370        load_database("GenBank/cor6_6.gb")
371
372        self.server = BioSeqDatabase.open_database(
373            driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB
374        )
375
376        self.db = self.server["biosql-test"]
377
378    def tearDown(self):
379        self.server.close()
380        destroy_database()
381        del self.db
382        del self.server
383
384    def test_server(self):
385        """Check BioSeqDatabase methods."""
386        server = self.server
387        self.assertIn("biosql-test", server)
388        self.assertEqual(1, len(server))
389        self.assertEqual(["biosql-test"], list(server.keys()))
390        # Check we can delete the namespace...
391        del server["biosql-test"]
392        self.assertEqual(0, len(server))
393        with self.assertRaises(KeyError):
394            del server["non-existant-name"]
395
396    def test_get_db_items(self):
397        """Check list, keys, length etc."""
398        db = self.db
399        items = list(db.values())
400        keys = list(db)
401        length = len(items)
402        self.assertEqual(length, len(db))
403        self.assertEqual(length, len(list(db.items())))
404        self.assertEqual(length, len(list(db)))
405        self.assertEqual(length, len(list(db.values())))
406        for (k1, r1), (k2, r2) in zip(zip(keys, items), db.items()):
407            self.assertEqual(k1, k2)
408            self.assertEqual(r1.id, r2.id)
409        for k in keys:
410            del db[k]
411        self.assertEqual(0, len(db))
412        with self.assertRaises(KeyError):
413            del db["non-existant-name"]
414
415    def test_lookup_items(self):
416        """Test retrieval of items using various ids."""
417        self.db.lookup(accession="X62281")
418        self.assertRaises(IndexError, self.db.lookup, accession="Not real")
419        self.db.lookup(display_id="ATKIN2")
420        self.assertRaises(IndexError, self.db.lookup, display_id="Not real")
421
422        # primary id retrieval
423        self.db.lookup(primary_id="16353")
424        self.assertRaises(IndexError, self.db.lookup, primary_id="Not Real")
425
426
427class SeqInterfaceTest(unittest.TestCase):
428    """Make sure the BioSQL objects implement the expected biopython interface."""
429
430    def setUp(self):
431        """Load a database."""
432        load_database("GenBank/cor6_6.gb")
433
434        self.server = BioSeqDatabase.open_database(
435            driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB
436        )
437        self.db = self.server["biosql-test"]
438        self.item = self.db.lookup(accession="X62281")
439        self.item2 = self.db.lookup(accession="AJ237582")
440
441    def tearDown(self):
442        self.server.close()
443        destroy_database()
444        del self.db
445        del self.item
446        del self.server
447
448    def test_seq_record(self):
449        """Make sure SeqRecords from BioSQL implement the right interface."""
450        test_record = self.item
451        self.assertIsInstance(test_record.seq, Seq)
452        self.assertEqual(test_record.id, "X62281.1", test_record.id)
453        self.assertEqual(test_record.name, "ATKIN2")
454        self.assertEqual(test_record.description, "A.thaliana kin2 gene")
455        self.assertTrue(hasattr(test_record, "annotations"))
456        # XXX should do something with annotations once they are like
457        # a dictionary
458        for feature in test_record.features:
459            self.assertIsInstance(feature, SeqFeature)
460        # shouldn't cause any errors!
461        self.assertIsInstance(str(test_record), str)
462        # Confirm can delete annotations etc to test these properties
463        del test_record.annotations
464        del test_record.dbxrefs
465        del test_record.features
466        del test_record.seq
467
468    def test_seq(self):
469        """Make sure Seqs from BioSQL implement the right interface."""
470        test_seq = self.item.seq
471        string_rep = str(test_seq)
472        self.assertEqual(string_rep, str(test_seq))  # check __str__ too
473        self.assertEqual(type(string_rep), type(""))
474        self.assertEqual(len(test_seq), 880)
475        self.assertEqual(test_seq[879], "A")
476        self.assertEqual(test_seq[-1], "A")
477        self.assertEqual(test_seq[0], "A")
478        self.assertEqual(test_seq[-880], "A")
479        self.assertRaises(IndexError, test_seq.__getitem__, 880)
480        self.assertRaises(IndexError, test_seq.__getitem__, -881)
481        self.assertRaises(TypeError, test_seq.__getitem__, None)
482
483    def test_convert(self):
484        """Check can turn a Seq object from BioSQL into a Seq or MutableSeq."""
485        test_seq = self.item.seq
486
487        other = Seq(test_seq)
488        self.assertEqual(test_seq, other)
489        self.assertIsInstance(other, Seq)
490
491        other = MutableSeq(test_seq)
492        self.assertEqual(test_seq, other)
493        self.assertIsInstance(other, MutableSeq)
494
495    def test_addition(self):
496        """Check can add Seq objects from BioSQL together."""
497        test_seq = self.item.seq
498        for other in [
499            Seq("ACGT"),
500            MutableSeq("ACGT"),
501            "ACGT",
502            test_seq,
503        ]:
504            test = test_seq + other
505            self.assertEqual(test, str(test_seq) + str(other))
506            self.assertIsInstance(test, Seq)
507            test = other + test_seq
508            self.assertEqual(test, str(other) + str(test_seq))
509
510    def test_multiplication(self):
511        """Check can multiply Seq objects from BioSQL by integers."""
512        test_seq = self.item.seq
513        tripled = test_seq * 3
514        # Test Seq.__mul__
515        self.assertIsInstance(tripled, Seq)
516        self.assertEqual(tripled, str(test_seq) * 3)
517        # Test Seq.__rmul__
518        tripled = 3 * test_seq
519        self.assertIsInstance(tripled, Seq)
520        self.assertEqual(tripled, str(test_seq) * 3)
521        # Test Seq.__imul__
522        original = self.item.seq
523        tripled = test_seq
524        tripled *= 3
525        self.assertIsInstance(tripled, Seq)
526        self.assertEqual(tripled, str(original) * 3)
527
528    def test_seq_slicing(self):
529        """Check that slices of sequences are retrieved properly."""
530        test_seq = self.item.seq
531        new_seq = test_seq[:10]
532        self.assertIsInstance(new_seq, Seq)
533        # simple slicing
534        self.assertEqual(test_seq[:5], "ATTTG")
535        self.assertEqual(test_seq[0:5], "ATTTG")
536        self.assertEqual(test_seq[2:3], "T")
537        self.assertEqual(test_seq[2:4], "TT")
538        self.assertEqual(test_seq[870:], "TTGAATTATA")
539        # getting more fancy
540        self.assertEqual(test_seq[-1], "A")
541        self.assertEqual(test_seq[1], "T")
542        self.assertEqual(test_seq[-10:][5:], "TTATA")
543
544    def test_record_slicing(self):
545        """Check that slices of DBSeqRecord are retrieved properly."""
546        new_rec = self.item[400:]
547        self.assertIsInstance(new_rec, SeqRecord)
548        self.assertEqual(len(new_rec), 480)
549        self.assertEqual(len(new_rec.features), 5)
550
551    def test_seq_features(self):
552        """Check SeqFeatures of a sequence."""
553        test_features = self.item.features
554        cds_feature = test_features[6]
555        self.assertEqual(cds_feature.type, "CDS")
556        self.assertEqual(
557            str(cds_feature.location), "join{[103:160](+), [319:390](+), [503:579](+)}"
558        )
559
560        msg = "Missing expected entries, have %r" % cds_feature.qualifiers
561        self.assertIn("gene", cds_feature.qualifiers)
562        self.assertIn("protein_id", cds_feature.qualifiers)
563        self.assertIn("codon_start", cds_feature.qualifiers)
564        self.assertEqual(cds_feature.qualifiers.get("gene"), ["kin2"])
565        self.assertEqual(cds_feature.qualifiers.get("protein_id"), ["CAA44171.1"])
566        self.assertEqual(cds_feature.qualifiers.get("codon_start"), ["1"])
567
568        self.assertIn("db_xref", cds_feature.qualifiers)
569        multi_ann = cds_feature.qualifiers["db_xref"]
570        self.assertEqual(len(multi_ann), 2)
571        self.assertIn("GI:16354", multi_ann)
572        self.assertIn("SWISS-PROT:P31169", multi_ann)
573
574    def test_eq(self):
575        seq1 = self.item.seq
576        seq2 = self.item2.seq
577        self.assertEqual(seq1[30:32], seq2[3:5])
578        self.assertEqual(seq1[30:32], "CA")
579        self.assertEqual(seq2[3:5], "CA")
580        self.assertEqual(seq1[30:32], b"CA")
581        self.assertEqual(seq2[3:5], b"CA")
582        self.assertEqual(seq1[30:32], Seq("CA"))
583        self.assertEqual(seq2[3:5], Seq("CA"))
584        self.assertEqual(seq1[30:32], MutableSeq("CA"))
585        self.assertEqual(seq2[3:5], MutableSeq("CA"))
586        self.assertEqual(seq2[3:5], seq1[30:32])
587        self.assertEqual("CA", seq1[30:32])
588        self.assertEqual("CA", seq2[3:5])
589        self.assertEqual(b"CA", seq1[30:32])
590        self.assertEqual(b"CA", seq2[3:5])
591        self.assertEqual(Seq("CA"), seq1[30:32])
592        self.assertEqual(Seq("CA"), seq2[3:5])
593        self.assertEqual(MutableSeq("CA"), seq1[30:32])
594        self.assertEqual(MutableSeq("CA"), seq2[3:5])
595        with self.assertRaises(UndefinedSequenceError):
596            seq1 == Seq(None, len(seq1))
597        with self.assertRaises(UndefinedSequenceError):
598            seq2 == Seq(None, len(seq2))
599        with self.assertRaises(UndefinedSequenceError):
600            seq1 == Seq(None, 10)
601        with self.assertRaises(UndefinedSequenceError):
602            seq2 == Seq(None, 10)
603        with self.assertRaises(UndefinedSequenceError):
604            Seq(None, len(seq1)) == seq1
605        with self.assertRaises(UndefinedSequenceError):
606            Seq(None, len(seq2)) == seq2
607        with self.assertRaises(UndefinedSequenceError):
608            Seq(None, 10) == seq1
609        with self.assertRaises(UndefinedSequenceError):
610            Seq(None, 10) == seq2
611
612    def test_ne(self):
613        seq1 = self.item.seq
614        seq2 = self.item2.seq
615        self.assertNotEqual(seq1, seq2)
616        self.assertNotEqual(seq1, "CA")
617        self.assertNotEqual(seq2, "CA")
618        self.assertNotEqual(seq1, b"CA")
619        self.assertNotEqual(seq2, b"CA")
620        self.assertNotEqual(seq1, Seq("CA"))
621        self.assertNotEqual(seq2, Seq("CA"))
622        self.assertNotEqual(seq1, MutableSeq("CA"))
623        self.assertNotEqual(seq2, MutableSeq("CA"))
624        self.assertNotEqual(seq1[30:32], "GG")
625        self.assertNotEqual(seq2[3:5], "GG")
626        self.assertNotEqual(seq1[30:32], b"GG")
627        self.assertNotEqual(seq2[3:5], b"GG")
628        self.assertNotEqual(seq1[30:32], Seq("GG"))
629        self.assertNotEqual(seq2[3:5], Seq("GG"))
630        self.assertNotEqual(seq1[30:32], MutableSeq("GG"))
631        self.assertNotEqual(seq2[3:5], MutableSeq("GG"))
632        self.assertNotEqual(seq2, seq1)
633        self.assertNotEqual("CA", seq1)
634        self.assertNotEqual("CA", seq2)
635        self.assertNotEqual(b"CA", seq1)
636        self.assertNotEqual(b"CA", seq2)
637        self.assertNotEqual(Seq("CA"), seq1)
638        self.assertNotEqual(Seq("CA"), seq2)
639        self.assertNotEqual(MutableSeq("CA"), seq1)
640        self.assertNotEqual(MutableSeq("CA"), seq2)
641        self.assertNotEqual("GG", seq1[30:32])
642        self.assertNotEqual("GG", seq2[3:5])
643        self.assertNotEqual(b"GG", seq1[30:32])
644        self.assertNotEqual(b"GG", seq2[3:5])
645        self.assertNotEqual(Seq("GG"), seq1[30:32])
646        self.assertNotEqual(Seq("GG"), seq2[3:5])
647        self.assertNotEqual(MutableSeq("GG"), seq1[30:32])
648        self.assertNotEqual(MutableSeq("GG"), seq2[3:5])
649        with self.assertRaises(UndefinedSequenceError):
650            seq1 != Seq(None, len(seq1))
651        with self.assertRaises(UndefinedSequenceError):
652            seq2 != Seq(None, len(seq2))
653        with self.assertRaises(UndefinedSequenceError):
654            seq1 != Seq(None, 10)
655        with self.assertRaises(UndefinedSequenceError):
656            seq2 != Seq(None, 10)
657        with self.assertRaises(UndefinedSequenceError):
658            Seq(None, len(seq1)) != seq1
659        with self.assertRaises(UndefinedSequenceError):
660            Seq(None, len(seq2)) != seq2
661        with self.assertRaises(UndefinedSequenceError):
662            Seq(None, 10) != seq1
663        with self.assertRaises(UndefinedSequenceError):
664            Seq(None, 10) != seq2
665
666    def test_lt(self):
667        seq1 = self.item.seq
668        seq2 = self.item2.seq
669        self.assertLess(seq1, seq2)
670        self.assertLess(seq1, "CC")
671        self.assertLess("CC", seq2)
672        self.assertLess(seq1, b"CC")
673        self.assertLess(b"CC", seq2)
674        self.assertLess(seq1, Seq("CC"))
675        self.assertLess(Seq("CC"), seq2)
676        self.assertLess(seq1, MutableSeq("CC"))
677        self.assertLess(MutableSeq("CC"), seq2)
678        self.assertLess("AA", seq1)
679        self.assertLess("AA", seq2)
680        self.assertLess(b"AA", seq1)
681        self.assertLess(b"AA", seq2)
682        self.assertLess(Seq("AA"), seq1)
683        self.assertLess(Seq("AA"), seq2)
684        self.assertLess(MutableSeq("AA"), seq1)
685        self.assertLess(MutableSeq("AA"), seq2)
686        self.assertLess(seq1, "TT")
687        self.assertLess(seq2, "TT")
688        self.assertLess(seq1, b"TT")
689        self.assertLess(seq2, b"TT")
690        self.assertLess(seq1, Seq("TT"))
691        self.assertLess(seq2, Seq("TT"))
692        self.assertLess(seq1, MutableSeq("TT"))
693        self.assertLess(seq2, MutableSeq("TT"))
694        with self.assertRaises(UndefinedSequenceError):
695            seq1 < Seq(None, len(seq1))
696        with self.assertRaises(UndefinedSequenceError):
697            seq2 < Seq(None, len(seq2))
698        with self.assertRaises(UndefinedSequenceError):
699            seq1 < Seq(None, 10)
700        with self.assertRaises(UndefinedSequenceError):
701            seq2 < Seq(None, 10)
702        self.assertLess("AA", seq1[30:32])
703        self.assertLess("AA", seq2[3:5])
704        self.assertLess(b"AA", seq1[30:32])
705        self.assertLess(b"AA", seq2[3:5])
706        self.assertLess(seq1[30:32], seq2[3:7])
707        self.assertLess(Seq("AA"), seq1[30:32])
708        self.assertLess(Seq("AA"), seq2[3:5])
709        self.assertLess(MutableSeq("AA"), seq1[30:32])
710        self.assertLess(MutableSeq("AA"), seq2[3:5])
711        self.assertLess(seq1[30:32], "TT")
712        self.assertLess(seq2[3:5], "TT")
713        self.assertLess(seq1[30:32], b"TT")
714        self.assertLess(seq2[3:5], b"TT")
715        self.assertLess(seq1[30:32], Seq("TT"))
716        self.assertLess(seq2[3:5], Seq("TT"))
717        self.assertLess(seq1[30:32], MutableSeq("TT"))
718        self.assertLess(seq2[3:5], MutableSeq("TT"))
719
720    def test_le(self):
721        seq1 = self.item.seq
722        seq2 = self.item2.seq
723        self.assertLessEqual(seq1, seq2)
724        self.assertLessEqual(seq1, "CC")
725        self.assertLessEqual("CC", seq2)
726        self.assertLessEqual(seq1, b"CC")
727        self.assertLessEqual(b"CC", seq2)
728        self.assertLessEqual(seq1, Seq("CC"))
729        self.assertLessEqual(Seq("CC"), seq2)
730        self.assertLessEqual(seq1, MutableSeq("CC"))
731        self.assertLessEqual(MutableSeq("CC"), seq2)
732        self.assertLessEqual("AA", seq1)
733        self.assertLessEqual("AA", seq2)
734        self.assertLessEqual(b"AA", seq1)
735        self.assertLessEqual(b"AA", seq2)
736        self.assertLessEqual(Seq("AA"), seq1)
737        self.assertLessEqual(Seq("AA"), seq2)
738        self.assertLessEqual(MutableSeq("AA"), seq1)
739        self.assertLessEqual(MutableSeq("AA"), seq2)
740        self.assertLessEqual(seq1, "TT")
741        self.assertLessEqual(seq2, "TT")
742        self.assertLessEqual(seq1, b"TT")
743        self.assertLessEqual(seq2, b"TT")
744        self.assertLessEqual(seq1, Seq("TT"))
745        self.assertLessEqual(seq2, Seq("TT"))
746        self.assertLessEqual(seq1, MutableSeq("TT"))
747        self.assertLessEqual(seq2, MutableSeq("TT"))
748        with self.assertRaises(UndefinedSequenceError):
749            seq1 < Seq(None, len(seq1))
750        with self.assertRaises(UndefinedSequenceError):
751            seq2 < Seq(None, len(seq2))
752        with self.assertRaises(UndefinedSequenceError):
753            seq1 < Seq(None, 10)
754        with self.assertRaises(UndefinedSequenceError):
755            seq2 < Seq(None, 10)
756        self.assertLessEqual("AA", seq1[30:32])
757        self.assertLessEqual("AA", seq2[3:5])
758        self.assertLessEqual(b"AA", seq1[30:32])
759        self.assertLessEqual(b"AA", seq2[3:5])
760        self.assertLessEqual(seq1[30:32], seq2[3:7])
761        self.assertLessEqual(Seq("AA"), seq1[30:32])
762        self.assertLessEqual(Seq("AA"), seq2[3:5])
763        self.assertLessEqual(MutableSeq("AA"), seq1[30:32])
764        self.assertLessEqual(MutableSeq("AA"), seq2[3:5])
765        self.assertLessEqual(seq1[30:32], "TT")
766        self.assertLessEqual(seq2[3:5], "TT")
767        self.assertLessEqual(seq1[30:32], b"TT")
768        self.assertLessEqual(seq2[3:5], b"TT")
769        self.assertLessEqual(seq1[30:32], Seq("TT"))
770        self.assertLessEqual(seq2[3:5], Seq("TT"))
771        self.assertLessEqual(seq1[30:32], MutableSeq("TT"))
772        self.assertLessEqual(seq2[3:5], MutableSeq("TT"))
773
774    def test_gt(self):
775        seq1 = self.item.seq
776        seq2 = self.item2.seq
777        self.assertGreater(seq2, seq1)
778        self.assertGreater("CC", seq1)
779        self.assertGreater(seq2, "CC")
780        self.assertGreater(b"CC", seq1)
781        self.assertGreater(seq2, b"CC")
782        self.assertGreater(Seq("CC"), seq1)
783        self.assertGreater(seq2, Seq("CC"))
784        self.assertGreater(MutableSeq("CC"), seq1)
785        self.assertGreater(seq2, MutableSeq("CC"))
786        self.assertGreater(seq1, "AA")
787        self.assertGreater(seq2, "AA")
788        self.assertGreater(seq1, b"AA")
789        self.assertGreater(seq2, b"AA")
790        self.assertGreater(seq1, Seq("AA"))
791        self.assertGreater(seq2, Seq("AA"))
792        self.assertGreater(seq1, MutableSeq("AA"))
793        self.assertGreater(seq2, MutableSeq("AA"))
794        self.assertGreater("TT", seq1)
795        self.assertGreater("TT", seq2)
796        self.assertGreater(b"TT", seq1)
797        self.assertGreater(b"TT", seq2)
798        self.assertGreater(Seq("TT"), seq1)
799        self.assertGreater(Seq("TT"), seq2)
800        self.assertGreater(MutableSeq("TT"), seq1)
801        self.assertGreater(MutableSeq("TT"), seq2)
802        with self.assertRaises(UndefinedSequenceError):
803            seq1 < Seq(None, len(seq1))
804        with self.assertRaises(UndefinedSequenceError):
805            seq2 < Seq(None, len(seq2))
806        with self.assertRaises(UndefinedSequenceError):
807            seq1 < Seq(None, 10)
808        with self.assertRaises(UndefinedSequenceError):
809            seq2 < Seq(None, 10)
810        self.assertGreater(seq1[30:32], "AA")
811        self.assertGreater(seq2[3:5], "AA")
812        self.assertGreater(seq1[30:32], b"AA")
813        self.assertGreater(seq2[3:5], b"AA")
814        self.assertGreater(seq1[30:34], seq2[3:5])
815        self.assertGreater(seq1[30:32], Seq("AA"))
816        self.assertGreater(seq2[3:5], Seq("AA"))
817        self.assertGreater(seq1[30:32], MutableSeq("AA"))
818        self.assertGreater(seq2[3:5], MutableSeq("AA"))
819        self.assertGreater("TT", seq1[30:32])
820        self.assertGreater("TT", seq2[3:5])
821        self.assertGreater(b"TT", seq1[30:32])
822        self.assertGreater(b"TT", seq2[3:5])
823        self.assertGreater(Seq("TT"), seq1[30:32])
824        self.assertGreater(Seq("TT"), seq2[3:5])
825        self.assertGreater(MutableSeq("TT"), seq1[30:32])
826        self.assertGreater(MutableSeq("TT"), seq2[3:5])
827
828    def test_ge(self):
829        seq1 = self.item.seq
830        seq2 = self.item2.seq
831        self.assertGreaterEqual(seq2, seq1)
832        self.assertGreaterEqual("CC", seq1)
833        self.assertGreaterEqual(seq2, "CC")
834        self.assertGreaterEqual(b"CC", seq1)
835        self.assertGreaterEqual(seq2, b"CC")
836        self.assertGreaterEqual(Seq("CC"), seq1)
837        self.assertGreaterEqual(seq2, Seq("CC"))
838        self.assertGreaterEqual(MutableSeq("CC"), seq1)
839        self.assertGreaterEqual(seq2, MutableSeq("CC"))
840        self.assertGreaterEqual(seq1, "AA")
841        self.assertGreaterEqual(seq2, "AA")
842        self.assertGreaterEqual(seq1, b"AA")
843        self.assertGreaterEqual(seq2, b"AA")
844        self.assertGreaterEqual(seq1, Seq("AA"))
845        self.assertGreaterEqual(seq2, Seq("AA"))
846        self.assertGreaterEqual(seq1, MutableSeq("AA"))
847        self.assertGreaterEqual(seq2, MutableSeq("AA"))
848        self.assertGreaterEqual("TT", seq1)
849        self.assertGreaterEqual("TT", seq2)
850        self.assertGreaterEqual(b"TT", seq1)
851        self.assertGreaterEqual(b"TT", seq2)
852        self.assertGreaterEqual(Seq("TT"), seq1)
853        self.assertGreaterEqual(Seq("TT"), seq2)
854        self.assertGreaterEqual(MutableSeq("TT"), seq1)
855        self.assertGreaterEqual(MutableSeq("TT"), seq2)
856        with self.assertRaises(UndefinedSequenceError):
857            seq1 < Seq(None, len(seq1))
858        with self.assertRaises(UndefinedSequenceError):
859            seq2 < Seq(None, len(seq2))
860        with self.assertRaises(UndefinedSequenceError):
861            seq1 < Seq(None, 10)
862        with self.assertRaises(UndefinedSequenceError):
863            seq2 < Seq(None, 10)
864        self.assertGreaterEqual(seq1[30:32], "AA")
865        self.assertGreaterEqual(seq2[3:5], "AA")
866        self.assertGreaterEqual(seq1[30:32], b"AA")
867        self.assertGreaterEqual(seq2[3:5], b"AA")
868        self.assertGreaterEqual(seq1[30:34], seq2[3:5])
869        self.assertGreaterEqual(seq1[30:32], Seq("AA"))
870        self.assertGreaterEqual(seq2[3:5], Seq("AA"))
871        self.assertGreaterEqual(seq1[30:32], MutableSeq("AA"))
872        self.assertGreaterEqual(seq2[3:5], MutableSeq("AA"))
873        self.assertGreaterEqual("TT", seq1[30:32])
874        self.assertGreaterEqual("TT", seq2[3:5])
875        self.assertGreaterEqual(b"TT", seq1[30:32])
876        self.assertGreaterEqual(b"TT", seq2[3:5])
877        self.assertGreaterEqual(Seq("TT"), seq1[30:32])
878        self.assertGreaterEqual(Seq("TT"), seq2[3:5])
879        self.assertGreaterEqual(MutableSeq("TT"), seq1[30:32])
880        self.assertGreaterEqual(MutableSeq("TT"), seq2[3:5])
881
882
883class LoaderTest(unittest.TestCase):
884    """Load a database from a GenBank file."""
885
886    def setUp(self):
887        # create TESTDB
888        TESTDB = create_database()
889
890        # load the database
891        db_name = "biosql-test"
892        self.server = BioSeqDatabase.open_database(
893            driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB
894        )
895
896        # remove the database if it already exists
897        try:
898            self.server[db_name]
899            self.server.remove_database(db_name)
900        except KeyError:
901            pass
902
903        self.db = self.server.new_database(db_name)
904
905        # get the GenBank file we are going to put into it
906        self.iterator = SeqIO.parse("GenBank/cor6_6.gb", "gb")
907
908    def tearDown(self):
909        self.server.close()
910        destroy_database()
911        del self.db
912        del self.server
913
914    def test_load_database(self):
915        """Load SeqRecord objects into a BioSQL database."""
916        self.db.load(self.iterator)
917
918        # do some simple tests to make sure we actually loaded the right
919        # thing. More advanced tests in a different module.
920        items = list(self.db.values())
921        self.assertEqual(len(items), 6)
922        self.assertEqual(len(self.db), 6)
923        item_names = []
924        item_ids = []
925        for item in items:
926            item_names.append(item.name)
927            item_ids.append(item.id)
928        item_names.sort()
929        item_ids.sort()
930        self.assertEqual(
931            item_names,
932            ["AF297471", "ARU237582", "ATCOR66M", "ATKIN2", "BNAKINI", "BRRBIF72"],
933        )
934        self.assertEqual(
935            item_ids,
936            [
937                "AF297471.1",
938                "AJ237582.1",
939                "L31939.1",
940                "M81224.1",
941                "X55053.1",
942                "X62281.1",
943            ],
944        )
945
946
947class DeleteTest(unittest.TestCase):
948    """Test proper deletion of entries from a database."""
949
950    loaded_db = 0
951
952    def setUp(self):
953        """Connect to and load up the database."""
954        load_database("GenBank/cor6_6.gb")
955
956        self.server = BioSeqDatabase.open_database(
957            driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB
958        )
959
960        self.db = self.server["biosql-test"]
961
962    def tearDown(self):
963        self.server.close()
964        destroy_database()
965        del self.db
966        del self.server
967
968    def test_server(self):
969        """Check BioSeqDatabase methods."""
970        server = self.server
971        self.assertIn("biosql-test", server)
972        self.assertEqual(1, len(server))
973        self.assertEqual(["biosql-test"], list(server.keys()))
974        # Check we can delete the namespace...
975        del server["biosql-test"]
976        self.assertEqual(0, len(server))
977        with self.assertRaises(KeyError):
978            del server["non-existant-name"]
979
980    def test_del_db_items(self):
981        """Check all associated data is deleted from an item."""
982        db = self.db
983        items = list(db.values())
984        keys = list(db)
985        length = len(items)
986
987        for seq_id in keys:
988            sql = "SELECT seqfeature_id from seqfeature where bioentry_id = '%s'"
989            # get the original number of seqfeatures associated with the bioentry
990            seqfeatures = self.db.adaptor.execute_and_fetchall(sql % (seq_id))
991
992            del db[seq_id]
993            # check to see that the entry in the bioentry table is removed
994            self.assertNotIn(seq_id, db)
995
996            # no need to check seqfeature presence if it had none to begin with
997            if len(seqfeatures):
998                rows_d = self.db.adaptor.execute_and_fetchall(sql % (seq_id))
999                # check to see that associated data is removed
1000                self.assertEqual(len(rows_d), 0)
1001
1002        self.assertEqual(0, len(list(db.values())))
1003
1004
1005class DupLoadTest(unittest.TestCase):
1006    """Check a few duplicate conditions fail."""
1007
1008    def setUp(self):
1009        # drop any old database and create a new one:
1010        TESTDB = create_database()
1011        # connect to new database:
1012        self.server = BioSeqDatabase.open_database(
1013            driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB
1014        )
1015        # Create new namespace within new empty database:
1016        self.db = self.server.new_database("biosql-test")
1017
1018    def tearDown(self):
1019        self.server.rollback()
1020        self.server.close()
1021        destroy_database()
1022        del self.db
1023        del self.server
1024
1025    def test_duplicate_load(self):
1026        """Make sure can't import a single record twice (in one go)."""
1027        record = SeqRecord(
1028            Seq("ATGCTATGACTAT"), id="Test1", annotations={"molecule_type": "DNA"}
1029        )
1030        with self.assertRaises(Exception) as cm:
1031            self.db.load([record, record])
1032        err = cm.exception
1033        # Note we check for a specific exception because the exception
1034        # class will depend on which DB back end is in use.
1035        self.assertIn(
1036            err.__class__.__name__,
1037            ["IntegrityError", "UniqueViolation", "AttributeError", "OperationalError"],
1038        )
1039
1040    def test_duplicate_load2(self):
1041        """Make sure can't import a single record twice (in steps)."""
1042        record = SeqRecord(
1043            Seq("ATGCTATGACTAT"), id="Test2", annotations={"molecule_type": "DNA"}
1044        )
1045        count = self.db.load([record])
1046        self.assertEqual(count, 1)
1047        with self.assertRaises(Exception) as cm:
1048            self.db.load([record])
1049        err = cm.exception
1050        # Note we check for a specific exception because the exception
1051        # class will depend on which DB back end is in use.
1052        self.assertIn(
1053            err.__class__.__name__,
1054            ["IntegrityError", "UniqueViolation", "AttributeError"],
1055        )
1056
1057    def test_duplicate_id_load(self):
1058        """Make sure can't import records with same ID (in one go)."""
1059        record1 = SeqRecord(
1060            Seq("ATGCTATGACTAT"), id="TestA", annotations={"molecule_type": "DNA"}
1061        )
1062        record2 = SeqRecord(
1063            Seq("GGGATGCGACTAT"), id="TestA", annotations={"molecule_type": "DNA"}
1064        )
1065        with self.assertRaises(Exception) as cm:
1066            self.db.load([record1, record2])
1067        err = cm.exception
1068        # Note we check for a specific exception because the exception
1069        # class will depend on which DB back end is in use.
1070        self.assertIn(
1071            err.__class__.__name__,
1072            ["IntegrityError", "UniqueViolation", "AttributeError"],
1073        )
1074
1075
1076class ClosedLoopTest(SeqRecordTestBaseClass):
1077    """Test file -> BioSQL -> file."""
1078
1079    @classmethod
1080    def setUpClass(cls):
1081        # NOTE - For speed I don't bother to create a new database each time,
1082        # simply a new unique namespace is used for each test.
1083        TESTDB = create_database()
1084
1085    def test_NC_005816(self):
1086        """From GenBank file to BioSQL and back to a GenBank file, NC_005816."""
1087        with warnings.catch_warnings():
1088            # BiopythonWarning: order location operators are not fully supported
1089            warnings.simplefilter("ignore", BiopythonWarning)
1090            self.loop("GenBank/NC_005816.gb", "gb")
1091
1092    def test_NC_000932(self):
1093        """From GenBank file to BioSQL and back to a GenBank file, NC_000932."""
1094        self.loop("GenBank/NC_000932.gb", "gb")
1095
1096    def test_NT_019265(self):
1097        """From GenBank file to BioSQL and back to a GenBank file, NT_019265."""
1098        self.loop("GenBank/NT_019265.gb", "gb")
1099
1100    def test_protein_refseq2(self):
1101        """From GenBank file to BioSQL and back to a GenBank file, protein_refseq2."""
1102        with warnings.catch_warnings():
1103            # BiopythonWarning: order location operators are not fully supported
1104            warnings.simplefilter("ignore", BiopythonWarning)
1105            self.loop("GenBank/protein_refseq2.gb", "gb")
1106
1107    def test_no_ref(self):
1108        """From GenBank file to BioSQL and back to a GenBank file, noref."""
1109        self.loop("GenBank/noref.gb", "gb")
1110
1111    def test_one_of(self):
1112        """From GenBank file to BioSQL and back to a GenBank file, one_of."""
1113        self.loop("GenBank/one_of.gb", "gb")
1114
1115    def test_cor6_6(self):
1116        """From GenBank file to BioSQL and back to a GenBank file, cor6_6."""
1117        self.loop("GenBank/cor6_6.gb", "gb")
1118
1119    def test_arab1(self):
1120        """From GenBank file to BioSQL and back to a GenBank file, arab1."""
1121        self.loop("GenBank/arab1.gb", "gb")
1122
1123    def loop(self, filename, format):
1124        original_records = []
1125        for record in SeqIO.parse(filename, format):
1126            if "RNA" in record.annotations.get("molecule_type", ""):
1127                if "U" in record.seq:
1128                    record.annotations["molecule_type"] = "RNA"
1129                else:
1130                    record.annotations["molecule_type"] = "DNA"
1131            original_records.append(record)
1132        # now open a connection to load the database
1133        server = BioSeqDatabase.open_database(
1134            driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB
1135        )
1136        db_name = "test_loop_%s" % filename  # new namespace!
1137        db = server.new_database(db_name)
1138        count = db.load(original_records)
1139        self.assertEqual(count, len(original_records))
1140        server.commit()
1141        # Now read them back...
1142        biosql_records = [db.lookup(name=rec.name) for rec in original_records]
1143        # And check they agree
1144        self.compare_records(original_records, biosql_records)
1145        # Now write to a handle...
1146        handle = StringIO()
1147        SeqIO.write(biosql_records, handle, "gb")
1148        # Now read them back...
1149        handle.seek(0)
1150        new_records = list(SeqIO.parse(handle, "gb"))
1151        # And check they still agree
1152        self.assertEqual(len(new_records), len(original_records))
1153        for old, new in zip(original_records, new_records):
1154            # TODO - remove this hack because we don't yet write these (yet):
1155            for key in ["comment", "references"]:
1156                if key in old.annotations and key not in new.annotations:
1157                    del old.annotations[key]
1158            self.compare_record(old, new)
1159        # Done
1160        handle.close()
1161        server.close()
1162
1163
1164class TransferTest(SeqRecordTestBaseClass):
1165    """Test file -> BioSQL, BioSQL -> BioSQL."""
1166
1167    # NOTE - For speed I don't bother to create a new database each time,
1168    # simply a new unique namespace is used for each test.
1169
1170    def setUp(self):
1171        TESTDB = create_database()
1172
1173    def test_NC_005816(self):
1174        """From GenBank file to BioSQL, then again to a new namespace, NC_005816."""
1175        with warnings.catch_warnings():
1176            # BiopythonWarning: order location operators are not fully supported
1177            warnings.simplefilter("ignore", BiopythonWarning)
1178            self.trans("GenBank/NC_005816.gb", "gb")
1179
1180    def test_NC_000932(self):
1181        """From GenBank file to BioSQL, then again to a new namespace, NC_000932."""
1182        self.trans("GenBank/NC_000932.gb", "gb")
1183
1184    def test_NT_019265(self):
1185        """From GenBank file to BioSQL, then again to a new namespace, NT_019265."""
1186        self.trans("GenBank/NT_019265.gb", "gb")
1187
1188    def test_protein_refseq2(self):
1189        """From GenBank file to BioSQL, then again to a new namespace, protein_refseq2."""
1190        with warnings.catch_warnings():
1191            # BiopythonWarning: order location operators are not fully supported
1192            warnings.simplefilter("ignore", BiopythonWarning)
1193            self.trans("GenBank/protein_refseq2.gb", "gb")
1194
1195    def test_no_ref(self):
1196        """From GenBank file to BioSQL, then again to a new namespace, noref."""
1197        self.trans("GenBank/noref.gb", "gb")
1198
1199    def test_one_of(self):
1200        """From GenBank file to BioSQL, then again to a new namespace, one_of."""
1201        self.trans("GenBank/one_of.gb", "gb")
1202
1203    def test_cor6_6(self):
1204        """From GenBank file to BioSQL, then again to a new namespace, cor6_6."""
1205        self.trans("GenBank/cor6_6.gb", "gb")
1206
1207    def test_arab1(self):
1208        """From GenBank file to BioSQL, then again to a new namespace, arab1."""
1209        self.trans("GenBank/arab1.gb", "gb")
1210
1211    def trans(self, filename, format):
1212        original_records = []
1213        for record in SeqIO.parse(filename, format):
1214            if record.annotations.get("molecule_type") == "mRNA":
1215                record.annotations["molecule_type"] = "DNA"
1216            original_records.append(record)
1217        # now open a connection to load the database
1218        server = BioSeqDatabase.open_database(
1219            driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB
1220        )
1221        db_name = "test_trans1_%s" % filename  # new namespace!
1222        db = server.new_database(db_name)
1223        count = db.load(original_records)
1224        self.assertEqual(count, len(original_records))
1225        server.commit()
1226        # Now read them back...
1227        biosql_records = [db.lookup(name=rec.name) for rec in original_records]
1228        # And check they agree
1229        self.compare_records(original_records, biosql_records)
1230        # Now write to a second name space...
1231        db_name = "test_trans2_%s" % filename  # new namespace!
1232        db = server.new_database(db_name)
1233        count = db.load(biosql_records)
1234        self.assertEqual(count, len(original_records))
1235        # Now read them back again,
1236        biosql_records2 = [db.lookup(name=rec.name) for rec in original_records]
1237        # And check they also agree
1238        self.compare_records(original_records, biosql_records2)
1239        # Done
1240        server.close()
1241
1242    def tearDown(self):
1243        destroy_database()
1244
1245
1246class InDepthLoadTest(unittest.TestCase):
1247    """Make sure we are loading and retreiving in a semi-lossless fashion."""
1248
1249    def setUp(self):
1250        gb_file = os.path.join(os.getcwd(), "GenBank", "cor6_6.gb")
1251        load_database(gb_file)
1252
1253        self.server = BioSeqDatabase.open_database(
1254            driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB
1255        )
1256        self.db = self.server["biosql-test"]
1257
1258    def tearDown(self):
1259        self.server.close()
1260        destroy_database()
1261        del self.db
1262        del self.server
1263
1264    def test_transfer(self):
1265        """Make sure can load record into another namespace."""
1266        # Should be in database already...
1267        db_record = self.db.lookup(accession="X55053")
1268        # Make a new namespace
1269        db2 = self.server.new_database("biosql-test-alt")
1270        # Should be able to load this DBSeqRecord there...
1271        count = db2.load([db_record])
1272        self.assertEqual(count, 1)
1273
1274    def test_reload(self):
1275        """Make sure can't reimport existing records."""
1276        gb_file = os.path.join(os.getcwd(), "GenBank", "cor6_6.gb")
1277        with open(gb_file) as gb_handle:
1278            record = next(SeqIO.parse(gb_handle, "gb"))
1279        # Should be in database already...
1280        db_record = self.db.lookup(accession="X55053")
1281        self.assertEqual(db_record.id, record.id)
1282        self.assertEqual(db_record.name, record.name)
1283        self.assertEqual(db_record.description, record.description)
1284        self.assertEqual(db_record.seq, record.seq)
1285        # Good... now try reloading it!
1286        with self.assertRaises(Exception) as cm:
1287            self.db.load([record])
1288        err = cm.exception
1289        # Note we check for a specific exception because the exception
1290        # class will depend on which DB back end is in use.
1291        self.assertIn(
1292            err.__class__.__name__,
1293            ["IntegrityError", "UniqueViolation", "AttributeError"],
1294        )
1295
1296    def test_record_loading(self):
1297        """Make sure all records are correctly loaded."""
1298        test_record = self.db.lookup(accession="X55053")
1299        self.assertEqual(test_record.name, "ATCOR66M")
1300        self.assertEqual(test_record.id, "X55053.1")
1301        self.assertEqual(test_record.description, "A.thaliana cor6.6 mRNA")
1302        self.assertEqual(test_record.annotations["molecule_type"], "DNA")
1303        self.assertEqual(test_record.seq[:20], "AACAAAACACACATCAAAAA")
1304
1305        test_record = self.db.lookup(accession="X62281")
1306        self.assertEqual(test_record.name, "ATKIN2")
1307        self.assertEqual(test_record.id, "X62281.1")
1308        self.assertEqual(test_record.description, "A.thaliana kin2 gene")
1309        self.assertEqual(test_record.annotations["molecule_type"], "DNA")
1310        self.assertEqual(test_record.seq[:10], "ATTTGGCCTA")
1311
1312    def test_seq_feature(self):
1313        """In depth check that SeqFeatures are transmitted through the db."""
1314        test_record = self.db.lookup(accession="AJ237582")
1315        features = test_record.features
1316        self.assertEqual(len(features), 7)
1317
1318        # test single locations
1319        test_feature = features[0]
1320        self.assertEqual(test_feature.type, "source")
1321        self.assertEqual(str(test_feature.location), "[0:206](+)")
1322        self.assertEqual(len(test_feature.qualifiers), 3)
1323        self.assertEqual(test_feature.qualifiers["country"], ["Russia:Bashkortostan"])
1324        self.assertEqual(test_feature.qualifiers["organism"], ["Armoracia rusticana"])
1325        self.assertEqual(test_feature.qualifiers["db_xref"], ["taxon:3704"])
1326
1327        # test split locations
1328        test_feature = features[4]
1329        self.assertEqual(test_feature.type, "CDS")
1330        self.assertEqual(str(test_feature.location), "join{[0:48](+), [142:206](+)}")
1331        self.assertEqual(len(test_feature.location.parts), 2)
1332        self.assertEqual(str(test_feature.location.parts[0]), "[0:48](+)")
1333        self.assertEqual(str(test_feature.location.parts[1]), "[142:206](+)")
1334        self.assertEqual(test_feature.location.operator, "join")
1335        self.assertEqual(len(test_feature.qualifiers), 6)
1336        self.assertEqual(test_feature.qualifiers["gene"], ["csp14"])
1337        self.assertEqual(test_feature.qualifiers["codon_start"], ["2"])
1338        self.assertEqual(test_feature.qualifiers["product"], ["cold shock protein"])
1339        self.assertEqual(test_feature.qualifiers["protein_id"], ["CAB39890.1"])
1340        self.assertEqual(test_feature.qualifiers["db_xref"], ["GI:4538893"])
1341        self.assertEqual(
1342            test_feature.qualifiers["translation"],
1343            ["DKAKDAAAAAGASAQQAGKNISDAAAGGVNFVKEKTG"],
1344        )
1345
1346        # test passing strand information
1347        # XXX We should be testing complement as well
1348        test_record = self.db.lookup(accession="AJ237582")
1349        test_feature = test_record.features[4]  # DNA, no complement
1350        self.assertEqual(test_feature.strand, 1)
1351        for loc in test_feature.location.parts:
1352            self.assertEqual(loc.strand, 1)
1353
1354        test_record = self.db.lookup(accession="X55053")
1355        test_feature = test_record.features[0]
1356        # mRNA, so really cDNA, so the strand should be 1 (not complemented)
1357        self.assertEqual(test_feature.strand, 1)
1358
1359
1360#####################################################################
1361
1362
1363class AutoSeqIOTests(SeqRecordTestBaseClass):
1364    """Test SeqIO and BioSQL together."""
1365
1366    server = None
1367    db = None
1368
1369    @classmethod
1370    def setUpClass(cls):
1371        # Create and reuse on database for all tests in this class
1372        TESTDB = create_database()
1373
1374    def setUp(self):
1375        """Connect to the database."""
1376        db_name = "biosql-test-seqio"
1377        server = BioSeqDatabase.open_database(
1378            driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB
1379        )
1380        self.server = server
1381        if db_name not in server:
1382            self.db = server.new_database(db_name)
1383            server.commit()
1384        self.db = self.server[db_name]
1385
1386    def tearDown(self):
1387        if self.db:
1388            del self.db
1389        if self.server:
1390            self.server.close()
1391            del self.server
1392
1393    def check(self, t_format, t_filename, t_count=1):
1394        db = self.db
1395
1396        records = []
1397        for record in SeqIO.parse(t_filename, t_format):
1398            molecule_type = record.annotations.get("molecule_type")
1399            if molecule_type is not None:
1400                if "DNA" in molecule_type:
1401                    record.annotations["molecule_type"] = "DNA"
1402                elif "RNA" in molecule_type:
1403                    record.annotations["molecule_type"] = "RNA"
1404                elif "protein" in molecule_type:
1405                    record.annotations["molecule_type"] = "protein"
1406                else:
1407                    raise Exception("Unknown molecule type '%s'" % molecule_type)
1408            records.append(record)
1409        count = db.load(records)
1410        assert count == t_count
1411        self.server.commit()
1412
1413        for record in records:
1414            key = record.name
1415            # print(" - Retrieving by name/display_id '%s'," % key)
1416            db_rec = db.lookup(name=key)
1417            self.compare_record(record, db_rec)
1418            db_rec = db.lookup(display_id=key)
1419            self.compare_record(record, db_rec)
1420
1421            key = record.id
1422            if key.count(".") == 1 and key.split(".")[1].isdigit():
1423                # print(" - Retrieving by version '%s'," % key)
1424                db_rec = db.lookup(version=key)
1425                self.compare_record(record, db_rec)
1426
1427            if "accessions" in record.annotations:
1428                # Only expect FIRST accession to work!
1429                key = record.annotations["accessions"][0]
1430                assert key, "Blank accession in annotation %r" % record.annotations
1431                if key != record.id:
1432                    # print(" - Retrieving by accession '%s'," % key)
1433                    db_rec = db.lookup(accession=key)
1434                    self.compare_record(record, db_rec)
1435
1436            if "gi" in record.annotations:
1437                key = record.annotations["gi"]
1438                if key != record.id:
1439                    # print(" - Retrieving by GI '%s'," % key)
1440                    db_rec = db.lookup(primary_id=key)
1441                    self.compare_record(record, db_rec)
1442
1443    def test_SeqIO_loading(self):
1444        self.check("fasta", "Fasta/lupine.nu")
1445        self.check("fasta", "Fasta/elderberry.nu")
1446        self.check("fasta", "Fasta/phlox.nu")
1447        self.check("fasta", "Fasta/centaurea.nu")
1448        self.check("fasta", "Fasta/wisteria.nu")
1449        self.check("fasta", "Fasta/sweetpea.nu")
1450        self.check("fasta", "Fasta/lavender.nu")
1451        self.check("fasta", "Fasta/aster.pro")
1452        self.check("fasta", "Fasta/loveliesbleeding.pro")
1453        self.check("fasta", "Fasta/rose.pro")
1454        self.check("fasta", "Fasta/rosemary.pro")
1455        self.check("fasta", "Fasta/f001")
1456        self.check("fasta", "Fasta/f002", 3)
1457        self.check("fasta", "Fasta/fa01", 2)
1458        self.check("fasta", "GFF/NC_001802.fna")
1459        self.check("fasta", "GFF/multi.fna", 3)
1460        self.check("fasta", "Registry/seqs.fasta", 2)
1461        self.check("swiss", "SwissProt/sp001")
1462        self.check("swiss", "SwissProt/sp002")
1463        self.check("swiss", "SwissProt/sp003")
1464        self.check("swiss", "SwissProt/P0A186.txt")
1465        self.check("swiss", "SwissProt/sp005")
1466        self.check("swiss", "SwissProt/sp006")
1467        self.check("swiss", "SwissProt/sp007")
1468        self.check("swiss", "SwissProt/sp008")
1469        self.check("swiss", "SwissProt/sp009")
1470        self.check("swiss", "SwissProt/sp010")
1471        self.check("swiss", "SwissProt/sp011")
1472        self.check("swiss", "SwissProt/sp012")
1473        self.check("swiss", "SwissProt/sp013")
1474        self.check("swiss", "SwissProt/P60137.txt")
1475        self.check("swiss", "SwissProt/sp015")
1476        self.check("swiss", "SwissProt/sp016")
1477        self.check("swiss", "Registry/EDD_RAT.dat")
1478        self.check("genbank", "GenBank/noref.gb")
1479        self.check("genbank", "GenBank/cor6_6.gb", 6)
1480        self.check("genbank", "GenBank/iro.gb")
1481        self.check("genbank", "GenBank/pri1.gb")
1482        self.check("genbank", "GenBank/arab1.gb")
1483        with warnings.catch_warnings():
1484            # BiopythonWarning: order location operators are not fully
1485            # supported
1486            warnings.simplefilter("ignore", BiopythonWarning)
1487            self.check("genbank", "GenBank/protein_refseq2.gb")
1488        self.check("genbank", "GenBank/extra_keywords.gb")
1489        self.check("genbank", "GenBank/one_of.gb")
1490        self.check("genbank", "GenBank/NT_019265.gb")
1491        self.check("genbank", "GenBank/origin_line.gb")
1492        self.check("genbank", "GenBank/blank_seq.gb")
1493        with warnings.catch_warnings():
1494            # BiopythonWarning: bond location operators are not fully supported
1495            warnings.simplefilter("ignore", BiopythonWarning)
1496            self.check("genbank", "GenBank/dbsource_wrap.gb")
1497            # BiopythonWarning: order location operators are not fully
1498            # supported
1499            self.check("genbank", "GenBank/NC_005816.gb")
1500        self.check("genbank", "GenBank/gbvrl1_start.seq", 3)
1501        self.check("genbank", "GFF/NC_001422.gbk")
1502        self.check("embl", "EMBL/TRBG361.embl")
1503        self.check("embl", "EMBL/DD231055_edited.embl")
1504        self.check("embl", "EMBL/SC10H5.embl")
1505        self.check("embl", "EMBL/U87107.embl")
1506        self.assertEqual(len(self.db), 66)
1507
1508
1509class SwissProtUnknownPositionTest(unittest.TestCase):
1510    """Handle SwissProt unknown position by setting value to null in database."""
1511
1512    def setUp(self):
1513        # drop any old database and create a new one:
1514        TESTDB = create_database()
1515        # connect to new database:
1516        self.server = BioSeqDatabase.open_database(
1517            driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB
1518        )
1519        # Create new namespace within new empty database:
1520        self.db = self.server.new_database("biosql-test")
1521
1522    def tearDown(self):
1523        self.server.rollback()
1524        self.server.close()
1525        destroy_database()
1526        del self.db
1527        del self.server
1528
1529    def test_ambiguous_location(self):
1530        """Loaded uniprot-xml with ambiguous location in BioSQL."""
1531        id = "P97881"
1532        seqiter = SeqIO.parse("SwissProt/%s.xml" % id, "uniprot-xml")
1533        self.assertEqual(self.db.load(seqiter), 1)
1534
1535        dbrecord = self.db.lookup(primary_id=id)
1536        for feature in dbrecord.features:
1537            if feature.type == "signal peptide":
1538                self.assertIsInstance(feature.location.end, UnknownPosition)
1539            elif feature.type == "chain":
1540                self.assertIsInstance(feature.location.start, UnknownPosition)
1541            else:
1542                self.assertIsInstance(feature.location.start, ExactPosition)
1543
1544
1545class TestBaseClassMethods(unittest.TestCase):
1546    """Test if methods from the Bio.Seq base class are called correctly."""
1547
1548    def setUp(self):
1549        """Load a database."""
1550        path = "GenBank/cor6_6.gb"
1551        accession = "X62281"
1552        load_database(path)
1553
1554        self.server = BioSeqDatabase.open_database(
1555            driver=DBDRIVER, user=DBUSER, passwd=DBPASSWD, host=DBHOST, db=TESTDB
1556        )
1557        self.db = self.server["biosql-test"]
1558        self.seq1 = self.db.lookup(accession=accession).seq
1559        records = SeqIO.parse(path, "genbank")
1560        for record in records:
1561            if accession in record.annotations["accessions"]:
1562                break
1563        else:
1564            raise RuntimeError(
1565                "Failed to find accession %s in GenBank file" % accession
1566            )
1567        self.seq2 = record.seq
1568
1569    def tearDown(self):
1570        self.server.close()
1571        destroy_database()
1572        del self.db
1573        del self.seq1
1574        del self.seq2
1575        del self.server
1576
1577    def test_bytes(self):
1578        b = bytes(self.seq1)
1579        self.assertIsInstance(b, bytes)
1580        self.assertEqual(len(b), 880)
1581        self.assertEqual(b, bytes(self.seq2))
1582
1583    def test_hash(self):
1584        self.assertEqual(hash(self.seq1), hash(self.seq2))
1585
1586    def test_add(self):
1587        self.assertIsInstance(self.seq1 + "ABCD", Seq)
1588        self.assertEqual(self.seq1 + "ABCD", self.seq2 + "ABCD")
1589
1590    def test_radd(self):
1591        self.assertIsInstance("ABCD" + self.seq1, Seq)
1592        self.assertEqual("ABCD" + self.seq1, "ABCD" + self.seq2)
1593
1594    def test_mul(self):
1595        self.assertIsInstance(2 * self.seq1, Seq)
1596        self.assertEqual(2 * self.seq1, 2 * self.seq2)
1597        self.assertIsInstance(self.seq1 * 2, Seq)
1598        self.assertEqual(self.seq1 * 2, self.seq2 * 2)
1599
1600    def test_contains(self):
1601        for seq in (self.seq1, self.seq2):
1602            self.assertIn("CCTTAAGCCCA", seq)
1603            self.assertNotIn("ACGTACGT", seq)
1604
1605    def test_repr(self):
1606        self.assertIsInstance(repr(self.seq1), str)
1607        self.assertEqual(repr(self.seq1), repr(self.seq2))
1608
1609    def test_str(self):
1610        self.assertIsInstance(str(self.seq1), str)
1611        self.assertEqual(str(self.seq1), str(self.seq2))
1612
1613    def test_count(self):
1614        self.assertEqual(self.seq1.count("CT"), self.seq2.count("CT"))
1615        self.assertEqual(self.seq1.count("CT", 75), self.seq2.count("CT", 75))
1616        self.assertEqual(
1617            self.seq1.count("CT", 125, 250), self.seq2.count("CT", 125, 250)
1618        )
1619
1620    def test_find(self):
1621        self.assertEqual(self.seq1.find("CT"), self.seq2.find("CT"))
1622        self.assertEqual(self.seq1.find("CT", 75), self.seq2.find("CT", 75))
1623        self.assertEqual(self.seq1.find("CG", 75, 100), self.seq2.find("CG", 75, 100))
1624        self.assertEqual(
1625            self.seq1.find("CT", None, 100), self.seq2.find("CT", None, 100)
1626        )
1627
1628    def test_rfind(self):
1629        self.assertEqual(self.seq1.rfind("CT"), self.seq2.rfind("CT"))
1630        self.assertEqual(self.seq1.rfind("CT", 450), self.seq2.rfind("CT", 450))
1631        self.assertEqual(
1632            self.seq1.rfind("CT", None, 100), self.seq2.rfind("CT", None, 100)
1633        )
1634        self.assertEqual(self.seq1.rfind("CT", 75, 100), self.seq2.rfind("CT", 75, 100))
1635
1636    def test_index(self):
1637        self.assertEqual(self.seq1.index("CT"), self.seq2.index("CT"))
1638        self.assertEqual(self.seq1.index("CT", 75), self.seq2.index("CT", 75))
1639        self.assertEqual(
1640            self.seq1.index("CT", None, 100), self.seq2.index("CT", None, 100)
1641        )
1642        for seq in (self.seq1, self.seq2):
1643            self.assertRaises(ValueError, seq.index, "CG", 75, 100)
1644            self.assertRaises(ValueError, seq.index, "CG", 75, 100)
1645
1646    def test_rindex(self):
1647        self.assertEqual(self.seq1.rindex("CT"), self.seq2.rindex("CT"))
1648        self.assertEqual(
1649            self.seq1.rindex("CT", None, 100), self.seq2.rindex("CT", None, 100)
1650        )
1651        for seq in (self.seq1, self.seq2):
1652            self.assertRaises(ValueError, seq.rindex, "AG", 850)
1653            self.assertRaises(ValueError, seq.rindex, "CG", 75, 100)
1654
1655    def test_startswith(self):
1656        for seq in (self.seq1, self.seq2):
1657            self.assertTrue(seq.startswith("ATTT"))
1658            self.assertTrue(seq.startswith("TAAA", start=10))
1659            self.assertTrue(seq.startswith("TAAA", start=10, end=14))
1660            self.assertFalse(seq.startswith("TAAA", start=10, end=12))
1661
1662    def test_endswith(self):
1663        for seq in (self.seq1, self.seq2):
1664            self.assertTrue(seq.endswith("TATA"))
1665            self.assertTrue(seq.endswith("TATA", 876))
1666            self.assertTrue(seq.endswith("ATTA", 872, 878))
1667            self.assertFalse(seq.endswith("ATTA", 876, 878))
1668
1669    def test_split(self):
1670        self.assertEqual(self.seq1.split(), self.seq2.split())
1671        self.assertEqual(self.seq1.split("C"), self.seq2.split("C"))
1672        self.assertEqual(self.seq1.split("C", 1), self.seq2.split("C", 1))
1673
1674    def test_rsplit(self):
1675        self.assertEqual(self.seq1.rsplit(), self.seq2.rsplit())
1676        self.assertEqual(self.seq1.rsplit("C"), self.seq2.rsplit("C"))
1677        self.assertEqual(self.seq1.rsplit("C", 1), self.seq2.rsplit("C", 1))
1678
1679    def test_strip(self):
1680        self.assertEqual(self.seq1.strip("G"), self.seq2.strip("G"))
1681
1682    def test_lstrip(self, chars=None):
1683        self.assertEqual(self.seq1.lstrip("G"), self.seq2.lstrip("G"))
1684
1685    def test_rstrip(self, chars=None):
1686        self.assertEqual(self.seq1.rstrip("G"), self.seq2.rstrip("G"))
1687
1688    def test_upper(self):
1689        self.assertEqual(self.seq1.upper(), self.seq2.upper())
1690
1691    def test_lower(self):
1692        self.assertEqual(self.seq1.lower(), self.seq2.lower())
1693
1694    def test_replace(self):
1695        # seq.transcribe uses seq._data.replace
1696        self.assertEqual(self.seq1.transcribe(), self.seq2.transcribe())
1697
1698    def test_translate(self):
1699        # seq.complement uses seq._data.translate
1700        self.assertEqual(self.seq1.complement(), self.seq2.complement())
1701