1############################################################################### 2# 3# markerSet.py - Calculate and process marker sets. 4# 5############################################################################### 6# # 7# This program is free software: you can redistribute it and/or modify # 8# it under the terms of the GNU General Public License as published by # 9# the Free Software Foundation, either version 3 of the License, or # 10# (at your option) any later version. # 11# # 12# This program is distributed in the hope that it will be useful, # 13# but WITHOUT ANY WARRANTY; without even the implied warranty of # 14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # 15# GNU General Public License for more details. # 16# # 17# You should have received a copy of the GNU General Public License # 18# along with this program. If not, see <http://www.gnu.org/licenses/>. # 19# # 20############################################################################### 21 22import os 23import sys 24import logging 25import uuid 26import tempfile 27import shutil 28import multiprocessing as mp 29import cPickle as pickle 30import gzip 31 32from checkm.defaultValues import DefaultValues 33from checkm.hmmer import HMMERRunner 34from checkm.hmmerModelParser import HmmModelParser 35 36from checkm.util.pfam import PFAM 37 38 39class BinMarkerSets(): 40 """A collection of one or more marker sets associated with a bin.""" 41 42 # type of marker set 43 TAXONOMIC_MARKER_SET = 1 44 TREE_MARKER_SET = 2 45 HMM_MODELS_SET = 3 46 47 def __init__(self, binId, markerSetType): 48 self.logger = logging.getLogger('timestamp') 49 self.markerSets = [] 50 self.binId = binId 51 self.markerSetType = markerSetType 52 self.selectedLinageSpecificMarkerSet = None 53 54 def numMarkerSets(self): 55 """Number of marker sets associated with bin.""" 56 return len(self.markerSets) 57 58 def addMarkerSet(self, markerSet): 59 """Add marker set to bin.""" 60 self.markerSets.append(markerSet) 61 62 def markerSetIter(self): 63 """Generator function for iterating over marker sets.""" 64 for markerSet in self.markerSets: 65 yield markerSet 66 67 def getMarkerGenes(self): 68 """Get marker genes from all marker sets.""" 69 markerGenes = set() 70 for ms in self.markerSets: 71 markerGenes.update(ms.getMarkerGenes()) 72 73 return markerGenes 74 75 def mostSpecificMarkerSet(self): 76 return self.markerSets[0] 77 78 def treeMarkerSet(self): 79 pass 80 81 def selectedMarkerSet(self): 82 """Return the 'selected' marker set for this bin.""" 83 if self.markerSetType == self.TAXONOMIC_MARKER_SET: 84 return self.mostSpecificMarkerSet() 85 elif self.markerSetType == self.TREE_MARKER_SET: 86 return self.selectedLinageSpecificMarkerSet 87 else: 88 # there should be a single marker set associate with this bin 89 if len(self.markerSets) == 1: 90 return self.markerSets[0] 91 92 self.logger.error(' [Error] Expect a single marker set to be associated with each bin.') 93 sys.exit(1) 94 95 def setLineageSpecificSelectedMarkerSet(self, selectedMarkerSetMap): 96 uid = self.mostSpecificMarkerSet().UID 97 98 selectedId = selectedMarkerSetMap[uid] 99 self.selectedLinageSpecificMarkerSet = None 100 101 while not self.selectedLinageSpecificMarkerSet: 102 for ms in self.markerSets: 103 if ms.UID == selectedId: 104 self.selectedLinageSpecificMarkerSet = ms 105 break 106 107 if not self.selectedLinageSpecificMarkerSet: 108 # This is a hack for the reduced tree. Since not all 109 # marker sets are in the reduced tree it is possible the 110 # selected marker set might not be avaliable. In this case, 111 # we should move to the next suitable marker set. Ideally, 112 # this could be avoided by just forcing in the selected 113 # marker set. 114 selectedId = selectedMarkerSetMap[selectedId] 115 else: 116 break 117 118 if self.selectedLinageSpecificMarkerSet == None: 119 # something has gone wrong 120 self.logger.error(' [Error] Failed to set a selected lineage-specific marker set.') 121 sys.exit(1) 122 123 def removeMarkers(self, markersToRemove): 124 """Remove specified markers from all marker sets.""" 125 for markerSet in self.markerSets: 126 curMarkerSet = markerSet.markerSet 127 128 newMarkerSet = [] 129 for ms in curMarkerSet: 130 newMS = ms - markersToRemove 131 132 if len(newMS) != 0: 133 newMarkerSet.append(newMS) 134 135 markerSet.markerSet = newMarkerSet 136 137 def write(self, fout): 138 """Write marker set to file.""" 139 fout.write(self.binId) 140 fout.write('\t' + str(len(self.markerSets))) 141 for ms in self.markerSets: 142 fout.write('\t' + str(ms)) 143 fout.write('\n') 144 145 def read(self, line): 146 """Construct bin marker set data from line.""" 147 lineSplit = line.split('\t') 148 numMarkerSets = int(lineSplit[1]) 149 for i in xrange(0, numMarkerSets): 150 uid = lineSplit[i * 4 + 2] 151 lineageStr = lineSplit[i * 4 + 3] 152 numGenomes = int(lineSplit[i * 4 + 4]) 153 markerSet = eval(lineSplit[i * 4 + 5]) 154 self.markerSets.append(MarkerSet(uid, lineageStr, numGenomes, markerSet)) 155 156 157class MarkerSet(): 158 """A collection of marker genes organized into co-located sets.""" 159 def __init__(self, UID, lineageStr, numGenomes, markerSet): 160 self.logger = logging.getLogger('timestamp') 161 162 self.UID = UID # unique ID of marker set 163 self.lineageStr = lineageStr # taxonomic string associated with marker set 164 self.numGenomes = numGenomes # number of genomes used to calculate marker set 165 self.markerSet = markerSet # marker genes organized into co-located sets 166 167 def __repr__(self): 168 return str(self.UID) + '\t' + self.lineageStr + '\t' + str(self.numGenomes) + '\t' + str(self.markerSet) 169 170 def size(self): 171 """Number of marker genes and marker gene sets.""" 172 numMarkerGenes = 0 173 for m in self.markerSet: 174 numMarkerGenes += len(m) 175 176 return numMarkerGenes, len(self.markerSet) 177 178 def numMarkers(self): 179 """Number of marker genes.""" 180 return self.size()[0] 181 182 def numSets(self): 183 """Number of marker sets.""" 184 return len(self.markerSet) 185 186 def getMarkerGenes(self): 187 """Get marker genes within marker set.""" 188 markerGenes = set() 189 for m in self.markerSet: 190 for marker in m: 191 markerGenes.add(marker) 192 193 return markerGenes 194 195 def removeMarkers(self, markersToRemove): 196 """Remove specified markers from marker sets.""" 197 newMarkerSet = [] 198 for ms in self.markerSet: 199 newMS = ms - markersToRemove 200 201 if len(newMS) != 0: 202 newMarkerSet.append(newMS) 203 204 self.markerSet = newMarkerSet 205 206 def genomeCheck(self, hits, bIndividualMarkers): 207 """Calculate genome completeness and contamination.""" 208 if bIndividualMarkers: 209 present = 0 210 multiCopyCount = 0 211 for marker in self.getMarkerGenes(): 212 if marker in hits: 213 present += 1 214 multiCopyCount += (len(hits[marker]) - 1) 215 216 percComp = 100 * float(present) / self.numMarkers() 217 percCont = 100 * float(multiCopyCount) / self.numMarkers() 218 else: 219 comp = 0.0 220 cont = 0.0 221 for ms in self.markerSet: 222 present = 0 223 multiCopy = 0 224 for marker in ms: 225 count = len(hits.get(marker, [])) 226 if count == 1: 227 present += 1 228 elif count > 1: 229 present += 1 230 multiCopy += (count - 1) 231 232 comp += float(present) / len(ms) 233 cont += float(multiCopy) / len(ms) 234 235 percComp = 100 * comp / len(self.markerSet) 236 percCont = 100 * cont / len(self.markerSet) 237 238 return percComp, percCont 239 240 241class MarkerSetParser(): 242 """Parse marker set file.""" 243 244 def __init__(self, threads=1): 245 self.logger = logging.getLogger('timestamp') 246 self.numThreads = threads 247 248 def getMarkerSets(self, outDir, binIds, markerFile, excludeMarkersFile=None): 249 """Determine marker set for each bin.""" 250 251 # determine type of marker set file 252 markerFileType = self.markerFileType(markerFile) 253 254 # get marker set for each bin 255 binIdToBinMarkerSets = {} 256 257 if markerFileType == BinMarkerSets.TAXONOMIC_MARKER_SET: 258 binMarkerSets = self.parseTaxonomicMarkerSetFile(markerFile) 259 260 for binId in binIds: 261 binIdToBinMarkerSets[binId] = binMarkerSets 262 elif markerFileType == BinMarkerSets.TREE_MARKER_SET: 263 binIdToBinMarkerSets = self.parseLineageMarkerSetFile(markerFile) 264 else: 265 markers = [set()] 266 modelParser = HmmModelParser(markerFile) 267 for model in modelParser.parse(): 268 markers[0].add(model.acc) 269 markerSet = MarkerSet(0, "N/A", -1, markers) 270 271 for binId in binIds: 272 binMarkerSets = BinMarkerSets(binId, BinMarkerSets.HMM_MODELS_SET) 273 binMarkerSets.addMarkerSet(markerSet) 274 binIdToBinMarkerSets[binId] = binMarkerSets 275 276 # remove marker genes specified by user or marker for exclusion 277 markersToExclude = set() 278 if excludeMarkersFile: 279 markersToExclude = self.readExcludeMarkersFile(excludeMarkersFile) 280 281 markersToExclude.update(DefaultValues.MARKERS_TO_EXCLUDE) 282 for binId, binMarkerSet in binIdToBinMarkerSets.iteritems(): 283 binMarkerSet.removeMarkers(markersToExclude) 284 285 return binIdToBinMarkerSets 286 287 def readExcludeMarkersFile(self, excludeMarkersFile): 288 """Parse file specifying markers to exclude.""" 289 markersToExclude = set() 290 for line in open(excludeMarkersFile): 291 if line[0] == '#': 292 continue 293 294 marker = line.strip() 295 markersToExclude.add(marker) 296 297 return markersToExclude 298 299 def createHmmModels(self, outDir, binIds, markerFile): 300 """Create HMM model for each bins marker set.""" 301 302 # determine type of marker set file 303 markerFileType = self.markerFileType(markerFile) 304 305 # get HMM file for each bin 306 binIdToModels = {} 307 if markerFileType == BinMarkerSets.TAXONOMIC_MARKER_SET: 308 hmmModelFile = self.createHmmModelFile(binIds.keys()[0], markerFile) 309 310 modelParser = HmmModelParser(hmmModelFile) 311 models = modelParser.models() 312 for binId in binIds: 313 binIdToModels[binId] = models 314 315 os.remove(hmmModelFile) 316 elif markerFileType == BinMarkerSets.TREE_MARKER_SET: 317 binIdToModels = self.__createLineageHmmModels(binIds, markerFile) 318 else: 319 modelParser = HmmModelParser(markerFile) 320 models = modelParser.models() 321 for binId in binIds: 322 binIdToModels[binId] = models 323 324 return binIdToModels 325 326 def createHmmModelFile(self, binId, markerFile): 327 """Create HMM file for from a bin's marker set.""" 328 329 # determine type of marker set file 330 markerFileType = self.markerFileType(markerFile) 331 332 # create HMM file 333 hmmModelFile = os.path.join(tempfile.gettempdir(), str(uuid.uuid4())) 334 if markerFileType == BinMarkerSets.TAXONOMIC_MARKER_SET: 335 binMarkerSets = self.parseTaxonomicMarkerSetFile(markerFile) 336 self.__createMarkerHMMs(binMarkerSets, hmmModelFile, bReportProgress=False) 337 elif markerFileType == BinMarkerSets.TREE_MARKER_SET: 338 binIdToBinMarkerSets = self.parseLineageMarkerSetFile(markerFile) 339 self.__createMarkerHMMs(binIdToBinMarkerSets[binId], hmmModelFile, bReportProgress=False) 340 else: 341 shutil.copyfile(markerFile, hmmModelFile) 342 343 return hmmModelFile 344 345 def __createLineageHmmModels(self, binIds, markerFile): 346 """Create lineage-specific HMMs for each bin.""" 347 348 self.logger.info('Extracting lineage-specific HMMs with %d threads:' % self.numThreads) 349 350 workerQueue = mp.Queue() 351 writerQueue = mp.Queue() 352 353 for binId in binIds: 354 workerQueue.put(binId) 355 356 for _ in range(self.numThreads): 357 workerQueue.put(None) 358 359 binIdToModels = mp.Manager().dict() 360 361 try: 362 calcProc = [mp.Process(target=self.__fetchModelInfo, args=(binIdToModels, markerFile, workerQueue, writerQueue)) for _ in range(self.numThreads)] 363 writeProc = mp.Process(target=self.__reportFetchProgress, args=(len(binIds), writerQueue)) 364 365 writeProc.start() 366 367 for p in calcProc: 368 p.start() 369 370 for p in calcProc: 371 p.join() 372 373 writerQueue.put(None) 374 writeProc.join() 375 except: 376 # make sure all processes are terminated 377 for p in calcProc: 378 p.terminate() 379 380 writeProc.terminate() 381 382 # create a standard dictionary from the managed dictionary 383 d = {} 384 for binId in binIdToModels.keys(): 385 d[binId] = binIdToModels[binId] 386 387 return d 388 389 def __fetchModelInfo(self, binIdToModels, markerFile, queueIn, queueOut): 390 """Fetch HMM.""" 391 while True: 392 binId = queueIn.get(block=True, timeout=None) 393 if binId == None: 394 break 395 396 hmmModelFile = self.createHmmModelFile(binId, markerFile) 397 398 modelParser = HmmModelParser(hmmModelFile) 399 binIdToModels[binId] = modelParser.models() 400 401 os.remove(hmmModelFile) 402 403 queueOut.put(binId) 404 405 def __reportFetchProgress(self, numBins, queueIn): 406 """Report progress of extracted HMMs.""" 407 408 numProcessedBins = 0 409 if self.logger.getEffectiveLevel() <= logging.INFO: 410 statusStr = ' Finished extracting HMMs for %d of %d (%.2f%%) bins.' % (numProcessedBins, numBins, float(numProcessedBins) * 100 / numBins) 411 sys.stderr.write('%s\r' % statusStr) 412 sys.stderr.flush() 413 414 while True: 415 binId = queueIn.get(block=True, timeout=None) 416 if binId == None: 417 break 418 419 if self.logger.getEffectiveLevel() <= logging.INFO: 420 numProcessedBins += 1 421 statusStr = ' Finished extracting HMMs for %d of %d (%.2f%%) bins.' % (numProcessedBins, numBins, float(numProcessedBins) * 100 / numBins) 422 sys.stderr.write('%s\r' % statusStr) 423 sys.stderr.flush() 424 425 if self.logger.getEffectiveLevel() <= logging.INFO: 426 sys.stderr.write('\n') 427 428 def markerFileType(self, markerFile): 429 """Determine type of marker file.""" 430 with open(markerFile, 'r') as f: 431 header = f.readline() 432 433 if DefaultValues.TAXON_MARKER_FILE_HEADER in header: 434 return BinMarkerSets.TAXONOMIC_MARKER_SET 435 elif DefaultValues.LINEAGE_MARKER_FILE_HEADER in header: 436 return BinMarkerSets.TREE_MARKER_SET 437 elif 'HMMER3' in header: 438 return BinMarkerSets.HMM_MODELS_SET 439 else: 440 self.logger.error('Unrecognized file type: ' + markerFile) 441 sys.exit(1) 442 443 def __createMarkerHMMs(self, binMarkerSet, outputFile, bReportProgress=True): 444 """Create HMM file for markers.""" 445 446 # get list of marker genes 447 markerGenes = binMarkerSet.getMarkerGenes() 448 449 # get all genes from the same clan as any marker gene 450 pfam = PFAM(DefaultValues.PFAM_CLAN_FILE) 451 genesInSameClan = pfam.genesInSameClan(markerGenes) 452 453 # extract marker genes along with all genes from the same clan 454 allMarkers = markerGenes | genesInSameClan 455 456 if bReportProgress: 457 self.logger.info("There are %d genes in the marker set and %d genes from the same PFAM clan." % (len(markerGenes), len(genesInSameClan))) 458 459 # create file with all model accession numbers 460 keyFile = os.path.join(tempfile.gettempdir(), str(uuid.uuid4())) 461 fout = open(keyFile, 'w') 462 for modelAcc in allMarkers: 463 fout.write(modelAcc + '\n') 464 fout.close() 465 466 # fetch specified models 467 HF = HMMERRunner(mode='fetch') 468 HF.fetch(DefaultValues.HMM_MODELS, keyFile, outputFile, bKeyFile=True) 469 470 # index the HMM file 471 if os.path.exists(outputFile + '.ssi'): 472 os.remove(outputFile + '.ssi') 473 HF.index(outputFile) 474 475 # remove key file 476 os.remove(keyFile) 477 478 def parseTaxonomicMarkerSetFile(self, markerSetFile): 479 """Parse marker set from a taxonomic-specific marker set file.""" 480 with open(markerSetFile) as f: 481 f.readline() # skip header 482 483 binLine = f.readline() 484 taxonId = binLine.split('\t')[0] 485 binMarkerSets = BinMarkerSets(taxonId, BinMarkerSets.TAXONOMIC_MARKER_SET) 486 binMarkerSets.read(binLine) 487 488 return binMarkerSets 489 490 def parseLineageMarkerSetFile(self, markerSetFile): 491 """Parse marker sets from a lineage-specific marker set file.""" 492 493 # read all marker sets 494 binIdToBinMarkerSets = {} 495 with open(markerSetFile) as f: 496 f.readline() # skip header 497 498 for line in f: 499 lineSplit = line.split('\t') 500 binId = lineSplit[0] 501 502 binMarkerSets = BinMarkerSets(binId, BinMarkerSets.TREE_MARKER_SET) 503 binMarkerSets.read(line) 504 505 # determine selected marker set 506 selectedMarkerSetMap = self.parseSelectedMarkerSetMap() 507 binMarkerSets.setLineageSpecificSelectedMarkerSet(selectedMarkerSetMap) 508 509 binIdToBinMarkerSets[binId] = binMarkerSets 510 511 return binIdToBinMarkerSets 512 513 def parseSelectedMarkerSetMap(self): 514 selectedMarkerSetMap = {} 515 for line in open(DefaultValues.SELECTED_MARKER_SETS): 516 lineSplit = line.split('\t') 517 internalID = lineSplit[0] 518 selectedID = lineSplit[1].rstrip() 519 520 selectedMarkerSetMap[internalID] = selectedID 521 522 return selectedMarkerSetMap 523 524 def writeBinModels(self, binIdToModels, filename): 525 """Save HMM model info for each bin to file.""" 526 527 self.logger.info('Saving HMM info to file.') 528 529 with gzip.open(filename, 'wb') as output: 530 pickle.dump(binIdToModels, output, pickle.HIGHEST_PROTOCOL) 531 532 def loadBinModels(self, filename): 533 """Read HMM model info for each bin from file.""" 534 535 self.logger.info('Reading HMM info from file.') 536 537 with gzip.open(filename, 'rb') as f: 538 binIdToModels = pickle.load(f) 539 540 return binIdToModels 541