1#!/usr/bin/env python 2 3############################################################################### 4# # 5# This program is free software: you can redistribute it and/or modify # 6# it under the terms of the GNU General Public License as published by # 7# the Free Software Foundation, either version 3 of the License, or # 8# (at your option) any later version. # 9# # 10# This program is distributed in the hope that it will be useful, # 11# but WITHOUT ANY WARRANTY; without even the implied warranty of # 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # 13# GNU General Public License for more details. # 14# # 15# You should have received a copy of the GNU General Public License # 16# along with this program. If not, see <http://www.gnu.org/licenses/>. # 17# # 18############################################################################### 19 20""" 21Use 'add-one-more' analysis to assess stability of marker sets for varying 22 numbers of reference genomes. 23""" 24 25__author__ = 'Donovan Parks' 26__copyright__ = 'Copyright 2013' 27__credits__ = ['Donovan Parks'] 28__license__ = 'GPL3' 29__version__ = '1.0.0' 30__maintainer__ = 'Donovan Parks' 31__email__ = 'donovan.parks@gmail.com' 32__status__ = 'Development' 33 34import sys 35import argparse 36import random 37import multiprocessing as mp 38 39from checkm.lib.img import IMG 40from lib.markerSet import MarkerSet 41 42from numpy import mean, std 43 44class MarkerSetStability(object): 45 def __init__(self): 46 self.img = IMG() 47 self.markerset = MarkerSet() 48 49 def __processLineage(self, metadata, ubiquityThreshold, singleCopyThreshold, minGenomes, queueIn, queueOut): 50 """Assess stability of marker set for a specific named taxonomic group.""" 51 while True: 52 lineage = queueIn.get(block=True, timeout=None) 53 if lineage == None: 54 break 55 56 genomeIds = self.img.genomeIdsByTaxonomy(lineage, metadata, 'trusted') 57 58 changeMarkerSetSize = {} 59 markerGenes = [] 60 if len(genomeIds) >= minGenomes: 61 # calculate marker set for all genomes in lineage 62 geneCountTable = self.img.geneCountTable(genomeIds) 63 markerGenes = self.markerset.markerGenes(genomeIds, geneCountTable, ubiquityThreshold*len(genomeIds), singleCopyThreshold*len(genomeIds)) 64 tigrToRemove = self.img.identifyRedundantTIGRFAMs(markerGenes) 65 markerGenes = markerGenes - tigrToRemove 66 67 for selectPer in xrange(50, 101, 5): 68 numGenomesToSelect = int(float(selectPer)/100 * len(genomeIds)) 69 perChange = [] 70 for _ in xrange(0, 10): 71 # calculate marker set for subset of genomes 72 subsetGenomeIds = random.sample(genomeIds, numGenomesToSelect) 73 geneCountTable = self.img.geneCountTable(subsetGenomeIds) 74 subsetMarkerGenes = self.markerset.markerGenes(subsetGenomeIds, geneCountTable, ubiquityThreshold*numGenomesToSelect, singleCopyThreshold*numGenomesToSelect) 75 tigrToRemove = self.img.identifyRedundantTIGRFAMs(subsetMarkerGenes) 76 subsetMarkerGenes = subsetMarkerGenes - tigrToRemove 77 78 perChange.append(float(len(markerGenes.symmetric_difference(subsetMarkerGenes)))*100.0 / len(markerGenes)) 79 80 changeMarkerSetSize[selectPer] = [mean(perChange), std(perChange)] 81 82 queueOut.put((lineage, len(genomeIds), len(markerGenes), changeMarkerSetSize)) 83 84 def __storeResults(self, outputFile, totalLineages, writerQueue): 85 """Store results to file.""" 86 87 fout = open(outputFile, 'w') 88 fout.write('Lineage\t# genomes\t# markers\tsubsample %\tmean % change\tstd % change\n') 89 90 numProcessedLineages = 0 91 while True: 92 lineage, numGenomes, numMarkerGenes, changeMarkerSetSize = writerQueue.get(block=True, timeout=None) 93 if lineage == None: 94 break 95 96 numProcessedLineages += 1 97 statusStr = ' Finished processing %d of %d (%.2f%%) lineages.' % (numProcessedLineages, totalLineages, float(numProcessedLineages)*100/totalLineages) 98 sys.stdout.write('%s\r' % statusStr) 99 sys.stdout.flush() 100 101 for selectPer in sorted(changeMarkerSetSize.keys()): 102 fout.write('%s\t%d\t%d\t%d\t%f\t%f\n' % (lineage, numGenomes, numMarkerGenes, selectPer, changeMarkerSetSize[selectPer][0], changeMarkerSetSize[selectPer][1])) 103 104 sys.stdout.write('\n') 105 106 fout.close() 107 108 109 def run(self, outputFile, ubiquityThreshold, singleCopyThreshold, minGenomes, mostSpecificRank, numThreads): 110 """Calculate stability of marker sets for named taxonomic groups.""" 111 112 print ' Calculating stability of marker sets:' 113 114 random.seed(1) 115 116 # process each sequence in parallel 117 workerQueue = mp.Queue() 118 writerQueue = mp.Queue() 119 120 metadata = self.img.genomeMetadata() 121 lineages = self.img.lineagesByCriteria(metadata, minGenomes, mostSpecificRank) 122 123 #lineages = ['Bacteria'] 124 #lineages += ['Bacteria;Proteobacteria'] 125 #lineages += ['Bacteria;Proteobacteria;Gammaproteobacteria'] 126 #lineages += ['Bacteria;Proteobacteria;Gammaproteobacteria;Enterobacteriales'] 127 #lineages += ['Bacteria;Proteobacteria;Gammaproteobacteria;Enterobacteriales;Enterobacteriaceae'] 128 #lineages += ['Bacteria;Proteobacteria;Gammaproteobacteria;Enterobacteriales;Enterobacteriaceae;Escherichia'] 129 #lineages += ['Bacteria;Proteobacteria;Gammaproteobacteria;Enterobacteriales;Enterobacteriaceae;Escherichia;coli'] 130 131 #lineages = ['Archaea'] 132 #lineages += ['Archaea;Euryarchaeota'] 133 #lineages += ['Archaea;Euryarchaeota;Methanomicrobia'] 134 #lineages += ['Archaea;Euryarchaeota;Methanomicrobia;Methanosarcinales'] 135 #lineages += ['Archaea;Euryarchaeota;Methanomicrobia;Methanosarcinales;Methanosarcinaceae'] 136 137 for lineage in lineages: 138 workerQueue.put(lineage) 139 140 for _ in range(numThreads): 141 workerQueue.put(None) 142 143 calcProc = [mp.Process(target = self.__processLineage, args = (metadata, ubiquityThreshold, singleCopyThreshold, minGenomes, workerQueue, writerQueue)) for _ in range(numThreads)] 144 writeProc = mp.Process(target = self.__storeResults, args = (outputFile, len(lineages), writerQueue)) 145 146 writeProc.start() 147 148 for p in calcProc: 149 p.start() 150 151 for p in calcProc: 152 p.join() 153 154 writerQueue.put((None, None, None, None)) 155 writeProc.join() 156 157if __name__ == '__main__': 158 parser = argparse.ArgumentParser(description="assess stability of marker sets", 159 formatter_class=argparse.ArgumentDefaultsHelpFormatter) 160 161 parser.add_argument('output_file', help='Output file.') 162 parser.add_argument('-u', '--ubiquity', help='Ubiquity threshold for defining marker set', type=float, default = 0.97) 163 parser.add_argument('-s', '--single_copy', help='Single-copy threshold for defining marker set', type=float, default = 0.97) 164 parser.add_argument('-m', '--min_genomes', help='Minimum genomes required to include in analysis', type=int, default = 20) 165 parser.add_argument('-r', '--most_specific_rank', help='Most specific rank to include in analysis', type=int, default = 6) 166 parser.add_argument('-t', '--threads', help='Threads to use', type=int, default = 32) 167 168 args = parser.parse_args() 169 170 markerSetStability = MarkerSetStability() 171 markerSetStability.run(args.output_file, args.ubiquity, args.single_copy, args.min_genomes, args.most_specific_rank, args.threads) 172