1#!/usr/bin/env python 2 3############################################################################### 4# # 5# This program is free software: you can redistribute it and/or modify # 6# it under the terms of the GNU General Public License as published by # 7# the Free Software Foundation, either version 3 of the License, or # 8# (at your option) any later version. # 9# # 10# This program is distributed in the hope that it will be useful, # 11# but WITHOUT ANY WARRANTY; without even the implied warranty of # 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # 13# GNU General Public License for more details. # 14# # 15# You should have received a copy of the GNU General Public License # 16# along with this program. If not, see <http://www.gnu.org/licenses/>. # 17# # 18############################################################################### 19 20""" 21Test stability of marker set for different named taxonomic groups. 22""" 23 24__author__ = 'Donovan Parks' 25__copyright__ = 'Copyright 2013' 26__credits__ = ['Donovan Parks'] 27__license__ = 'GPL3' 28__version__ = '1.0.0' 29__maintainer__ = 'Donovan Parks' 30__email__ = 'donovan.parks@gmail.com' 31__status__ = 'Development' 32 33import sys 34import argparse 35import random 36import multiprocessing as mp 37 38from checkm.lib.img import IMG 39from lib.markerSet import MarkerSet 40 41from numpy import mean, std 42 43class MarkerSetStabilityTest(object): 44 def __init__(self): 45 self.img = IMG() 46 self.markerset = MarkerSet() 47 48 def __processLineage(self, metadata, ubiquityThreshold, singleCopyThreshold, minGenomes, queueIn, queueOut): 49 """Assess stability of marker set for a specific named taxonomic group.""" 50 while True: 51 lineage = queueIn.get(block=True, timeout=None) 52 if lineage == None: 53 break 54 55 genomeIds = self.img.genomeIdsByTaxonomy(lineage, metadata, 'trusted') 56 57 markerGenes = [] 58 perChange = [] 59 numGenomesToSelect = int(0.9*len(genomeIds)) 60 if len(genomeIds) >= minGenomes: 61 # calculate marker set for all genomes in lineage 62 geneCountTable = self.img.geneCountTable(genomeIds) 63 markerGenes = self.markerset.markerGenes(genomeIds, geneCountTable, ubiquityThreshold*len(genomeIds), singleCopyThreshold*len(genomeIds)) 64 tigrToRemove = self.img.identifyRedundantTIGRFAMs(markerGenes) 65 66 markerGenes = markerGenes - tigrToRemove 67 68 for _ in xrange(0, 100): 69 # calculate marker set for subset of genomes 70 subsetGenomeIds = random.sample(genomeIds, numGenomesToSelect) 71 geneCountTable = self.img.geneCountTable(subsetGenomeIds) 72 subsetMarkerGenes = self.markerset.markerGenes(subsetGenomeIds, geneCountTable, ubiquityThreshold*numGenomesToSelect, singleCopyThreshold*numGenomesToSelect) 73 tigrToRemove = self.img.identifyRedundantTIGRFAMs(subsetMarkerGenes) 74 subsetMarkerGenes = subsetMarkerGenes - tigrToRemove 75 76 perChange.append(float(len(markerGenes.symmetric_difference(subsetMarkerGenes)))*100.0 / len(markerGenes)) 77 78 if perChange != []: 79 queueOut.put((lineage, len(genomeIds), len(markerGenes), numGenomesToSelect, mean(perChange), std(perChange))) 80 else: 81 queueOut.put((lineage, len(genomeIds), len(markerGenes), numGenomesToSelect, -1, -1)) 82 83 def __storeResults(self, outputFile, totalLineages, writerQueue): 84 """Store results to file.""" 85 86 fout = open(outputFile, 'w') 87 fout.write('Lineage\t# genomes\t# markers\t# sampled genomes\tmean % change\tstd % change\n') 88 89 numProcessedLineages = 0 90 while True: 91 lineage, numGenomes, numMarkerGenes, numSampledGenomes, meanPerChange, stdPerChange = writerQueue.get(block=True, timeout=None) 92 if lineage == None: 93 break 94 95 numProcessedLineages += 1 96 statusStr = ' Finished processing %d of %d (%.2f%%) lineages.' % (numProcessedLineages, totalLineages, float(numProcessedLineages)*100/totalLineages) 97 sys.stdout.write('%s\r' % statusStr) 98 sys.stdout.flush() 99 100 101 fout.write('%s\t%d\t%d\t%d\t%f\t%f\n' % (lineage, numGenomes, numMarkerGenes, numSampledGenomes, meanPerChange, stdPerChange)) 102 103 sys.stdout.write('\n') 104 105 fout.close() 106 107 108 def run(self, outputFile, ubiquityThreshold, singleCopyThreshold, minGenomes, mostSpecificRank, numThreads): 109 """Calculate stability of marker sets for named taxonomic groups.""" 110 111 print ' Testing stability of marker sets:' 112 113 random.seed(1) 114 115 # process each sequence in parallel 116 workerQueue = mp.Queue() 117 writerQueue = mp.Queue() 118 119 metadata = self.img.genomeMetadata() 120 lineages = self.img.lineagesByCriteria(metadata, minGenomes, mostSpecificRank) 121 122 for lineage in lineages: 123 workerQueue.put(lineage) 124 125 for _ in range(numThreads): 126 workerQueue.put(None) 127 128 calcProc = [mp.Process(target = self.__processLineage, args = (metadata, ubiquityThreshold, singleCopyThreshold, minGenomes, workerQueue, writerQueue)) for _ in range(numThreads)] 129 writeProc = mp.Process(target = self.__storeResults, args = (outputFile, len(lineages), writerQueue)) 130 131 writeProc.start() 132 133 for p in calcProc: 134 p.start() 135 136 for p in calcProc: 137 p.join() 138 139 writerQueue.put((None, None, None, None, None, None)) 140 writeProc.join() 141 142if __name__ == '__main__': 143 parser = argparse.ArgumentParser(description="assess stability of marker sets", 144 formatter_class=argparse.ArgumentDefaultsHelpFormatter) 145 146 parser.add_argument('output_file', help='Output file.') 147 parser.add_argument('-u', '--ubiquity', help='Ubiquity threshold for defining marker set', type=float, default = 0.97) 148 parser.add_argument('-s', '--single_copy', help='Single-copy threshold for defining marker set', type=float, default = 0.97) 149 parser.add_argument('-m', '--min_genomes', help='Minimum genomes required to include in analysis', type=int, default = 10) 150 parser.add_argument('-r', '--most_specific_rank', help='Most specific rank to include in analysis', type=int, default = 6) 151 parser.add_argument('-t', '--threads', help='Threads to use', type=int, default = 32) 152 153 args = parser.parse_args() 154 155 markerSetStabilityTest = MarkerSetStabilityTest() 156 markerSetStabilityTest.run(args.output_file, args.ubiquity, args.single_copy, args.min_genomes, args.most_specific_rank, args.threads) 157