1#!/usr/bin/env python
2
3###############################################################################
4#                                                                             #
5#    This program is free software: you can redistribute it and/or modify     #
6#    it under the terms of the GNU General Public License as published by     #
7#    the Free Software Foundation, either version 3 of the License, or        #
8#    (at your option) any later version.                                      #
9#                                                                             #
10#    This program is distributed in the hope that it will be useful,          #
11#    but WITHOUT ANY WARRANTY; without even the implied warranty of           #
12#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the            #
13#    GNU General Public License for more details.                             #
14#                                                                             #
15#    You should have received a copy of the GNU General Public License        #
16#    along with this program. If not, see <http://www.gnu.org/licenses/>.     #
17#                                                                             #
18###############################################################################
19
20"""
21Test stability of marker set for different named taxonomic groups.
22"""
23
24__author__ = 'Donovan Parks'
25__copyright__ = 'Copyright 2013'
26__credits__ = ['Donovan Parks']
27__license__ = 'GPL3'
28__version__ = '1.0.0'
29__maintainer__ = 'Donovan Parks'
30__email__ = 'donovan.parks@gmail.com'
31__status__ = 'Development'
32
33import sys
34import argparse
35import random
36import multiprocessing as mp
37
38from checkm.lib.img import IMG
39from lib.markerSet import MarkerSet
40
41from numpy import mean, std
42
43class MarkerSetStabilityTest(object):
44    def __init__(self):
45        self.img = IMG()
46        self.markerset = MarkerSet()
47
48    def __processLineage(self, metadata, ubiquityThreshold, singleCopyThreshold, minGenomes, queueIn, queueOut):
49        """Assess stability of marker set for a specific named taxonomic group."""
50        while True:
51            lineage = queueIn.get(block=True, timeout=None)
52            if lineage == None:
53                break
54
55            genomeIds = self.img.genomeIdsByTaxonomy(lineage, metadata, 'trusted')
56
57            markerGenes = []
58            perChange = []
59            numGenomesToSelect = int(0.9*len(genomeIds))
60            if len(genomeIds) >= minGenomes:
61                # calculate marker set for all genomes in lineage
62                geneCountTable = self.img.geneCountTable(genomeIds)
63                markerGenes = self.markerset.markerGenes(genomeIds, geneCountTable, ubiquityThreshold*len(genomeIds), singleCopyThreshold*len(genomeIds))
64                tigrToRemove = self.img.identifyRedundantTIGRFAMs(markerGenes)
65
66                markerGenes = markerGenes - tigrToRemove
67
68                for _ in xrange(0, 100):
69                    # calculate marker set for subset of genomes
70                    subsetGenomeIds = random.sample(genomeIds, numGenomesToSelect)
71                    geneCountTable = self.img.geneCountTable(subsetGenomeIds)
72                    subsetMarkerGenes = self.markerset.markerGenes(subsetGenomeIds, geneCountTable, ubiquityThreshold*numGenomesToSelect, singleCopyThreshold*numGenomesToSelect)
73                    tigrToRemove = self.img.identifyRedundantTIGRFAMs(subsetMarkerGenes)
74                    subsetMarkerGenes = subsetMarkerGenes - tigrToRemove
75
76                    perChange.append(float(len(markerGenes.symmetric_difference(subsetMarkerGenes)))*100.0 / len(markerGenes))
77
78            if perChange != []:
79                queueOut.put((lineage, len(genomeIds), len(markerGenes), numGenomesToSelect, mean(perChange), std(perChange)))
80            else:
81                queueOut.put((lineage, len(genomeIds), len(markerGenes), numGenomesToSelect, -1, -1))
82
83    def __storeResults(self, outputFile, totalLineages, writerQueue):
84        """Store results to file."""
85
86        fout = open(outputFile, 'w')
87        fout.write('Lineage\t# genomes\t# markers\t# sampled genomes\tmean % change\tstd % change\n')
88
89        numProcessedLineages = 0
90        while True:
91            lineage, numGenomes, numMarkerGenes, numSampledGenomes, meanPerChange, stdPerChange = writerQueue.get(block=True, timeout=None)
92            if lineage == None:
93                break
94
95            numProcessedLineages += 1
96            statusStr = '    Finished processing %d of %d (%.2f%%) lineages.' % (numProcessedLineages, totalLineages, float(numProcessedLineages)*100/totalLineages)
97            sys.stdout.write('%s\r' % statusStr)
98            sys.stdout.flush()
99
100
101            fout.write('%s\t%d\t%d\t%d\t%f\t%f\n' % (lineage, numGenomes, numMarkerGenes, numSampledGenomes, meanPerChange, stdPerChange))
102
103        sys.stdout.write('\n')
104
105        fout.close()
106
107
108    def run(self, outputFile, ubiquityThreshold, singleCopyThreshold, minGenomes, mostSpecificRank, numThreads):
109        """Calculate stability of marker sets for named taxonomic groups."""
110
111        print '  Testing stability of marker sets:'
112
113        random.seed(1)
114
115        # process each sequence in parallel
116        workerQueue = mp.Queue()
117        writerQueue = mp.Queue()
118
119        metadata = self.img.genomeMetadata()
120        lineages = self.img.lineagesByCriteria(metadata, minGenomes, mostSpecificRank)
121
122        for lineage in lineages:
123            workerQueue.put(lineage)
124
125        for _ in range(numThreads):
126            workerQueue.put(None)
127
128        calcProc = [mp.Process(target = self.__processLineage, args = (metadata, ubiquityThreshold, singleCopyThreshold, minGenomes, workerQueue, writerQueue)) for _ in range(numThreads)]
129        writeProc = mp.Process(target = self.__storeResults, args = (outputFile, len(lineages), writerQueue))
130
131        writeProc.start()
132
133        for p in calcProc:
134            p.start()
135
136        for p in calcProc:
137            p.join()
138
139        writerQueue.put((None, None, None, None, None, None))
140        writeProc.join()
141
142if __name__ == '__main__':
143    parser = argparse.ArgumentParser(description="assess stability of marker sets",
144                                      formatter_class=argparse.ArgumentDefaultsHelpFormatter)
145
146    parser.add_argument('output_file', help='Output file.')
147    parser.add_argument('-u', '--ubiquity', help='Ubiquity threshold for defining marker set', type=float, default = 0.97)
148    parser.add_argument('-s', '--single_copy', help='Single-copy threshold for defining marker set', type=float, default = 0.97)
149    parser.add_argument('-m', '--min_genomes', help='Minimum genomes required to include in analysis', type=int, default = 10)
150    parser.add_argument('-r', '--most_specific_rank', help='Most specific rank to include in analysis', type=int, default = 6)
151    parser.add_argument('-t', '--threads', help='Threads to use', type=int, default = 32)
152
153    args = parser.parse_args()
154
155    markerSetStabilityTest = MarkerSetStabilityTest()
156    markerSetStabilityTest.run(args.output_file, args.ubiquity, args.single_copy, args.min_genomes, args.most_specific_rank, args.threads)
157