1#!/usr/bin/env python
2
3###############################################################################
4#                                                                             #
5#    This program is free software: you can redistribute it and/or modify     #
6#    it under the terms of the GNU General Public License as published by     #
7#    the Free Software Foundation, either version 3 of the License, or        #
8#    (at your option) any later version.                                      #
9#                                                                             #
10#    This program is distributed in the hope that it will be useful,          #
11#    but WITHOUT ANY WARRANTY; without even the implied warranty of           #
12#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the            #
13#    GNU General Public License for more details.                             #
14#                                                                             #
15#    You should have received a copy of the GNU General Public License        #
16#    along with this program. If not, see <http://www.gnu.org/licenses/>.     #
17#                                                                             #
18###############################################################################
19
20__prog_name__ = 'TaxonomicMarkerSets'
21__prog_desc__ = 'create taxonomic-specific marker sets'
22
23__author__ = 'Donovan Parks'
24__copyright__ = 'Copyright 2013'
25__credits__ = ['Donovan Parks']
26__license__ = 'GPL3'
27__version__ = '0.0.1'
28__maintainer__ = 'Donovan Parks'
29__email__ = 'donovan.parks@gmail.com'
30__status__ = 'Development'
31
32import os
33import sys
34import argparse
35import multiprocessing as mp
36
37from checkm.util.img import IMG
38from checkm.util.taxonomyUtils import rankPrefixes, ranksByLevel
39from lib.markerSetBuilder import MarkerSetBuilder
40
41class TaxonomicMarkerSets(object):
42    def __init__(self):
43        pass
44
45    def __workerThread(self, ubiquityThreshold, singleCopyThreshold,
46                       minGenomes,
47                       colocatedDistThreshold, colocatedGenomeThreshold,
48                       metadata,
49                       queueIn, queueOut):
50        """Process each data item in parallel."""
51
52        img = IMG('/srv/whitlam/bio/db/checkm/img/img_metadata.tsv', '/srv/whitlam/bio/db/checkm/pfam/tigrfam2pfam.tsv')
53        markerSetBuilder = MarkerSetBuilder()
54
55        while True:
56            lineage = queueIn.get(block=True, timeout=None)
57            if lineage == None:
58                break
59
60            if lineage == 'Universal':
61                genomeIds = img.genomeIdsByTaxonomy('prokaryotes', metadata)
62            else:
63                genomeIds = img.genomeIdsByTaxonomy(lineage, metadata)
64            if len(genomeIds) >= minGenomes:
65                markerSet = markerSetBuilder.buildMarkerSet(genomeIds, ubiquityThreshold, singleCopyThreshold, colocatedDistThreshold)
66                colocatedSets = markerSet.markerSet
67            else:
68                colocatedSets = None
69
70            # allow results to be processed or written to file
71            queueOut.put((lineage, colocatedSets, len(genomeIds)))
72
73    def __writerThread(self, pfamIdToPfamAcc,
74                       ubiquityThreshold, singleCopyThreshold,
75                       colocatedDistThreshold, colocatedGenomeThreshold,
76                       outputDir, numDataItems, writerQueue):
77        """Store or write results of worker threads in a single thread."""
78
79        #taxonSetOut = open(os.path.join('..', 'data', 'taxon_marker_sets.tsv'), 'w')
80        taxonSetOut = open(os.path.join('.', 'data', 'taxon_marker_sets.tsv'), 'w')
81
82        processedItems = 0
83        while True:
84            lineage, colocatedSets, numGenomes = writerQueue.get(block=True, timeout=None)
85            if lineage == None:
86                break
87
88            processedItems += 1
89            statusStr = 'Finished processing %d of %d (%.2f%%) lineages.' % (processedItems, numDataItems, float(processedItems)*100/numDataItems)
90            sys.stdout.write('%s\r' % statusStr)
91            sys.stdout.flush()
92
93            if colocatedSets != None:
94                taxonomy = [x.strip() for x in lineage.split(';')]
95                rankPrefix = rankPrefixes[len(taxonomy)-1]
96
97                cladeName = taxonomy[-1].strip().replace(' ', '_')
98                fout = open(os.path.join(outputDir, rankPrefix + cladeName + '.txt'), 'w')
99                fout.write('# Taxonomic Marker Set\n')
100                fout.write('LINEAGE\t' + lineage + '\n')
101                fout.write('GENOME\t' + str(numGenomes) + '\n')
102                fout.write('UBIQUITY\t' + str(ubiquityThreshold) + '\n')
103                fout.write('SINGLE_COPY\t' + str(singleCopyThreshold) + '\n')
104                fout.write('COLOCATED_DISTANCE\t' + str(colocatedDistThreshold) + '\n')
105                fout.write('COLOCATED_GENOME_PERCENTAGE\t' + str(colocatedGenomeThreshold) + '\n')
106
107                # change model names to accession numbers, and make
108                # sure there is an HMM model for each PFAM
109                mungedColocatedSets = []
110                setSizes = []
111                for geneSet in colocatedSets:
112                    s = set()
113                    for geneId in geneSet:
114                        if 'pfam' in geneId:
115                            pfamId = geneId.replace('pfam', 'PF')
116                            if pfamId in pfamIdToPfamAcc:
117                                s.add(pfamIdToPfamAcc[pfamId])
118                        else:
119                            s.add(geneId)
120
121                    setSizes.append(len(s))
122                    mungedColocatedSets.append(s)
123
124                fout.write(str(mungedColocatedSets))
125                fout.close()
126
127                # write out single taxonomic-specific marker set file
128                numMarkerGenes = 0
129                for m in mungedColocatedSets:
130                    numMarkerGenes += len(m)
131
132                taxon = taxonomy[-1]
133                if len(taxonomy) == 7:
134                    taxon = taxonomy[5] + ' ' + taxonomy[6]
135
136                maxSetSize = max(setSizes)
137                avgSetSize = float(sum(setSizes))/len(setSizes)
138                taxonSetOut.write(ranksByLevel[len(taxonomy)-1] + '\t' + taxon + '\t' + lineage + '\t' + str(numGenomes) + '\t' + str(numMarkerGenes) + '\t' + str(len(mungedColocatedSets)) + '\t' + str(maxSetSize) + '\t' + str(avgSetSize) + '\t' + str(mungedColocatedSets) + '\n')
139
140        sys.stdout.write('\n')
141        taxonSetOut.close()
142
143    def __pfamIdToPfamAcc(self, img):
144        pfamIdToPfamAcc = {}
145        for line in open('/srv/whitlam/bio/db/pfam/27/Pfam-A.hmm'):
146            if 'ACC' in line:
147                acc = line.split()[1].strip()
148                pfamId = acc.split('.')[0]
149
150                pfamIdToPfamAcc[pfamId] = acc
151
152        return pfamIdToPfamAcc
153
154    def run(self, outputDir, ubiquityThreshold, singleCopyThreshold, minGenomes, colocatedDistThreshold, colocatedGenomeThreshold, threads):
155        if not os.path.exists(outputDir):
156            os.makedirs(outputDir)
157
158        # determine lineages to process
159        img = IMG('/srv/whitlam/bio/db/checkm/img/img_metadata.tsv', '/srv/whitlam/bio/db/checkm/pfam/tigrfam2pfam.tsv')
160        metadata = img.genomeMetadata()
161        lineages = img.lineagesSorted(metadata)
162        lineages.append('Universal')
163
164        # determine HMM model accession numbers
165        pfamIdToPfamAcc = self.__pfamIdToPfamAcc(img)
166
167        # populate worker queue with data to process
168        workerQueue = mp.Queue()
169        writerQueue = mp.Queue()
170
171        for lineage in lineages:
172            workerQueue.put(lineage)
173
174        for _ in range(threads):
175            workerQueue.put(None)
176
177        workerProc = [mp.Process(target = self.__workerThread, args = (ubiquityThreshold, singleCopyThreshold,
178                                                                       minGenomes,
179                                                                       colocatedDistThreshold, colocatedGenomeThreshold,
180                                                                       metadata,
181                                                                       workerQueue, writerQueue)) for _ in range(threads)]
182        writeProc = mp.Process(target = self.__writerThread, args = (pfamIdToPfamAcc,
183                                                                       ubiquityThreshold, singleCopyThreshold,
184                                                                       colocatedDistThreshold, colocatedGenomeThreshold,
185                                                                       outputDir, len(lineages), writerQueue))
186
187        writeProc.start()
188
189        for p in workerProc:
190            p.start()
191
192        for p in workerProc:
193            p.join()
194
195        writerQueue.put((None, None, None))
196        writeProc.join()
197
198if __name__ == '__main__':
199    print __prog_name__ + ' v' + __version__ + ': ' + __prog_desc__
200    print '  by ' + __author__ + ' (' + __email__ + ')' + '\n'
201
202    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
203    parser.add_argument('output_dir', help='output directory')
204    parser.add_argument('-u', '--ubiquity', help='ubiquity threshold for defining marker set', type=float, default = 0.97)
205    parser.add_argument('-s', '--single_copy', help='single-copy threshold for defining marker set', type=float, default = 0.97)
206    parser.add_argument('-m', '--min_genomes', help='minimum genomes required to infer marker set', type=int, default = 2)
207    parser.add_argument('-d', '--distance_threshold', help='genomic distance to be considered co-located', type=float, default=5000)
208    parser.add_argument('-g', '--genome_threshold', help='percentage of genomes required to be considered co-located', type=float, default=0.95)
209    parser.add_argument('-t', '--threads', type=int, help='number of threads', default=1)
210
211    args = parser.parse_args()
212
213    try:
214        taxonomicMarkerSets = TaxonomicMarkerSets()
215        taxonomicMarkerSets.run(args.output_dir, args.ubiquity, args. single_copy, args.min_genomes, args.distance_threshold, args.genome_threshold, args.threads)
216    except SystemExit:
217        print "\nControlled exit resulting from an unrecoverable error or warning."
218    except:
219        print "\nUnexpected error:", sys.exc_info()[0]
220        raise
221