1#!/usr/bin/env python 2 3############################################################################### 4# # 5# This program is free software: you can redistribute it and/or modify # 6# it under the terms of the GNU General Public License as published by # 7# the Free Software Foundation, either version 3 of the License, or # 8# (at your option) any later version. # 9# # 10# This program is distributed in the hope that it will be useful, # 11# but WITHOUT ANY WARRANTY; without even the implied warranty of # 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # 13# GNU General Public License for more details. # 14# # 15# You should have received a copy of the GNU General Public License # 16# along with this program. If not, see <http://www.gnu.org/licenses/>. # 17# # 18############################################################################### 19 20__prog_name__ = 'TaxonomicMarkerSets' 21__prog_desc__ = 'create taxonomic-specific marker sets' 22 23__author__ = 'Donovan Parks' 24__copyright__ = 'Copyright 2013' 25__credits__ = ['Donovan Parks'] 26__license__ = 'GPL3' 27__version__ = '0.0.1' 28__maintainer__ = 'Donovan Parks' 29__email__ = 'donovan.parks@gmail.com' 30__status__ = 'Development' 31 32import os 33import sys 34import argparse 35import multiprocessing as mp 36 37from checkm.util.img import IMG 38from checkm.util.taxonomyUtils import rankPrefixes, ranksByLevel 39from lib.markerSetBuilder import MarkerSetBuilder 40 41class TaxonomicMarkerSets(object): 42 def __init__(self): 43 pass 44 45 def __workerThread(self, ubiquityThreshold, singleCopyThreshold, 46 minGenomes, 47 colocatedDistThreshold, colocatedGenomeThreshold, 48 metadata, 49 queueIn, queueOut): 50 """Process each data item in parallel.""" 51 52 img = IMG('/srv/whitlam/bio/db/checkm/img/img_metadata.tsv', '/srv/whitlam/bio/db/checkm/pfam/tigrfam2pfam.tsv') 53 markerSetBuilder = MarkerSetBuilder() 54 55 while True: 56 lineage = queueIn.get(block=True, timeout=None) 57 if lineage == None: 58 break 59 60 if lineage == 'Universal': 61 genomeIds = img.genomeIdsByTaxonomy('prokaryotes', metadata) 62 else: 63 genomeIds = img.genomeIdsByTaxonomy(lineage, metadata) 64 if len(genomeIds) >= minGenomes: 65 markerSet = markerSetBuilder.buildMarkerSet(genomeIds, ubiquityThreshold, singleCopyThreshold, colocatedDistThreshold) 66 colocatedSets = markerSet.markerSet 67 else: 68 colocatedSets = None 69 70 # allow results to be processed or written to file 71 queueOut.put((lineage, colocatedSets, len(genomeIds))) 72 73 def __writerThread(self, pfamIdToPfamAcc, 74 ubiquityThreshold, singleCopyThreshold, 75 colocatedDistThreshold, colocatedGenomeThreshold, 76 outputDir, numDataItems, writerQueue): 77 """Store or write results of worker threads in a single thread.""" 78 79 #taxonSetOut = open(os.path.join('..', 'data', 'taxon_marker_sets.tsv'), 'w') 80 taxonSetOut = open(os.path.join('.', 'data', 'taxon_marker_sets.tsv'), 'w') 81 82 processedItems = 0 83 while True: 84 lineage, colocatedSets, numGenomes = writerQueue.get(block=True, timeout=None) 85 if lineage == None: 86 break 87 88 processedItems += 1 89 statusStr = 'Finished processing %d of %d (%.2f%%) lineages.' % (processedItems, numDataItems, float(processedItems)*100/numDataItems) 90 sys.stdout.write('%s\r' % statusStr) 91 sys.stdout.flush() 92 93 if colocatedSets != None: 94 taxonomy = [x.strip() for x in lineage.split(';')] 95 rankPrefix = rankPrefixes[len(taxonomy)-1] 96 97 cladeName = taxonomy[-1].strip().replace(' ', '_') 98 fout = open(os.path.join(outputDir, rankPrefix + cladeName + '.txt'), 'w') 99 fout.write('# Taxonomic Marker Set\n') 100 fout.write('LINEAGE\t' + lineage + '\n') 101 fout.write('GENOME\t' + str(numGenomes) + '\n') 102 fout.write('UBIQUITY\t' + str(ubiquityThreshold) + '\n') 103 fout.write('SINGLE_COPY\t' + str(singleCopyThreshold) + '\n') 104 fout.write('COLOCATED_DISTANCE\t' + str(colocatedDistThreshold) + '\n') 105 fout.write('COLOCATED_GENOME_PERCENTAGE\t' + str(colocatedGenomeThreshold) + '\n') 106 107 # change model names to accession numbers, and make 108 # sure there is an HMM model for each PFAM 109 mungedColocatedSets = [] 110 setSizes = [] 111 for geneSet in colocatedSets: 112 s = set() 113 for geneId in geneSet: 114 if 'pfam' in geneId: 115 pfamId = geneId.replace('pfam', 'PF') 116 if pfamId in pfamIdToPfamAcc: 117 s.add(pfamIdToPfamAcc[pfamId]) 118 else: 119 s.add(geneId) 120 121 setSizes.append(len(s)) 122 mungedColocatedSets.append(s) 123 124 fout.write(str(mungedColocatedSets)) 125 fout.close() 126 127 # write out single taxonomic-specific marker set file 128 numMarkerGenes = 0 129 for m in mungedColocatedSets: 130 numMarkerGenes += len(m) 131 132 taxon = taxonomy[-1] 133 if len(taxonomy) == 7: 134 taxon = taxonomy[5] + ' ' + taxonomy[6] 135 136 maxSetSize = max(setSizes) 137 avgSetSize = float(sum(setSizes))/len(setSizes) 138 taxonSetOut.write(ranksByLevel[len(taxonomy)-1] + '\t' + taxon + '\t' + lineage + '\t' + str(numGenomes) + '\t' + str(numMarkerGenes) + '\t' + str(len(mungedColocatedSets)) + '\t' + str(maxSetSize) + '\t' + str(avgSetSize) + '\t' + str(mungedColocatedSets) + '\n') 139 140 sys.stdout.write('\n') 141 taxonSetOut.close() 142 143 def __pfamIdToPfamAcc(self, img): 144 pfamIdToPfamAcc = {} 145 for line in open('/srv/whitlam/bio/db/pfam/27/Pfam-A.hmm'): 146 if 'ACC' in line: 147 acc = line.split()[1].strip() 148 pfamId = acc.split('.')[0] 149 150 pfamIdToPfamAcc[pfamId] = acc 151 152 return pfamIdToPfamAcc 153 154 def run(self, outputDir, ubiquityThreshold, singleCopyThreshold, minGenomes, colocatedDistThreshold, colocatedGenomeThreshold, threads): 155 if not os.path.exists(outputDir): 156 os.makedirs(outputDir) 157 158 # determine lineages to process 159 img = IMG('/srv/whitlam/bio/db/checkm/img/img_metadata.tsv', '/srv/whitlam/bio/db/checkm/pfam/tigrfam2pfam.tsv') 160 metadata = img.genomeMetadata() 161 lineages = img.lineagesSorted(metadata) 162 lineages.append('Universal') 163 164 # determine HMM model accession numbers 165 pfamIdToPfamAcc = self.__pfamIdToPfamAcc(img) 166 167 # populate worker queue with data to process 168 workerQueue = mp.Queue() 169 writerQueue = mp.Queue() 170 171 for lineage in lineages: 172 workerQueue.put(lineage) 173 174 for _ in range(threads): 175 workerQueue.put(None) 176 177 workerProc = [mp.Process(target = self.__workerThread, args = (ubiquityThreshold, singleCopyThreshold, 178 minGenomes, 179 colocatedDistThreshold, colocatedGenomeThreshold, 180 metadata, 181 workerQueue, writerQueue)) for _ in range(threads)] 182 writeProc = mp.Process(target = self.__writerThread, args = (pfamIdToPfamAcc, 183 ubiquityThreshold, singleCopyThreshold, 184 colocatedDistThreshold, colocatedGenomeThreshold, 185 outputDir, len(lineages), writerQueue)) 186 187 writeProc.start() 188 189 for p in workerProc: 190 p.start() 191 192 for p in workerProc: 193 p.join() 194 195 writerQueue.put((None, None, None)) 196 writeProc.join() 197 198if __name__ == '__main__': 199 print __prog_name__ + ' v' + __version__ + ': ' + __prog_desc__ 200 print ' by ' + __author__ + ' (' + __email__ + ')' + '\n' 201 202 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 203 parser.add_argument('output_dir', help='output directory') 204 parser.add_argument('-u', '--ubiquity', help='ubiquity threshold for defining marker set', type=float, default = 0.97) 205 parser.add_argument('-s', '--single_copy', help='single-copy threshold for defining marker set', type=float, default = 0.97) 206 parser.add_argument('-m', '--min_genomes', help='minimum genomes required to infer marker set', type=int, default = 2) 207 parser.add_argument('-d', '--distance_threshold', help='genomic distance to be considered co-located', type=float, default=5000) 208 parser.add_argument('-g', '--genome_threshold', help='percentage of genomes required to be considered co-located', type=float, default=0.95) 209 parser.add_argument('-t', '--threads', type=int, help='number of threads', default=1) 210 211 args = parser.parse_args() 212 213 try: 214 taxonomicMarkerSets = TaxonomicMarkerSets() 215 taxonomicMarkerSets.run(args.output_dir, args.ubiquity, args. single_copy, args.min_genomes, args.distance_threshold, args.genome_threshold, args.threads) 216 except SystemExit: 217 print "\nControlled exit resulting from an unrecoverable error or warning." 218 except: 219 print "\nUnexpected error:", sys.exc_info()[0] 220 raise 221