1""" 2Tools for reading a nexus file 3""" 4import os 5import gzip 6 7try: # pragma: no cover 8 from StringIO import StringIO 9except ImportError: 10 from io import StringIO 11 12from nexus.handlers import GenericHandler 13from nexus.handlers import BEGIN_PATTERN, END_PATTERN 14from nexus.handlers.taxa import TaxaHandler 15from nexus.handlers.data import CharacterHandler, DataHandler 16from nexus.handlers.tree import TreeHandler 17from nexus.exceptions import NexusFormatException 18 19 20class NexusReader(object): 21 """A nexus reader""" 22 def __init__(self, filename=None, debug=False): 23 self.debug = debug 24 self.blocks = {} 25 self.raw_blocks = {} 26 self.handlers = { 27 'data': DataHandler, 28 'characters': CharacterHandler, 29 'trees': TreeHandler, 30 'taxa': TaxaHandler, 31 } 32 if filename: 33 self.read_file(filename) 34 35 def _do_blocks(self): 36 """Iterates over all nexus blocks and parses them appropriately""" 37 for block, data in self.raw_blocks.items(): 38 self.blocks[block] = self.handlers.get(block, GenericHandler)() 39 self.blocks[block].parse(data) 40 41 if self.blocks.get('characters') and not self.blocks.get('data'): 42 self.blocks['data'] = self.blocks['characters'] 43 44 for block in self.blocks: 45 setattr(self, block, self.blocks[block]) 46 47 def read_file(self, filename): 48 """ 49 Loads and Parses a Nexus File 50 51 :param filename: filename of a nexus file 52 :type filename: string 53 54 :raises IOError: If file reading fails. 55 56 :return: None 57 """ 58 self.filename = filename 59 self.short_filename = os.path.split(filename)[1] 60 61 if not os.path.isfile(filename): 62 raise IOError("Unable To Read File %s" % filename) 63 64 if filename.endswith('.gz'): 65 handle = gzip.open(filename, 'rb') # pragma: no cover 66 else: 67 handle = open(filename, 'r') 68 self._read(handle) 69 handle.close() 70 71 def read_string(self, contents): 72 """ 73 Loads and Parses a Nexus from a string 74 75 :param contents: string or string-like object containing a nexus 76 :type contents: string 77 78 :return: None 79 """ 80 self.filename = "<String>" 81 self._read(StringIO(contents)) 82 return self 83 84 def _read(self, handle): 85 """Reads from a iterable object""" 86 store = {} 87 block = None 88 for line in handle.readlines(): 89 if hasattr(line, 'decode'): 90 line = line.decode('utf-8') 91 line = line.strip() 92 if not line: 93 continue 94 elif line.startswith('[') and line.endswith(']'): 95 continue 96 97 # check if we're in a block and initialise 98 found = BEGIN_PATTERN.findall(line) 99 if found: 100 block = found[0][0].lower() 101 if block in store: 102 raise NexusFormatException("Duplicate Block %s" % block) 103 store[block] = [] 104 105 # check if we're ending a block 106 if END_PATTERN.search(line): 107 if block: 108 store[block].append(line) 109 block = None 110 111 if block: 112 store[block].append(line) 113 self.raw_blocks = store 114 self._do_blocks() 115 116 def write(self): 117 """ 118 Generates a string containing a complete nexus from 119 all the data. 120 121 :return: String 122 """ 123 out = ["#NEXUS\n"] 124 for block in self.blocks: 125 out.append(self.blocks[block].write()) 126 # empty line after block if needed 127 if len(self.blocks) > 1: 128 out.append("\n") 129 return "\n".join(out) 130 131 def write_to_file(self, filename): 132 """ 133 Writes the nexus to a file. 134 135 :return: None 136 137 :raises IOError: If file writing fails. 138 """ 139 handle = open(filename, 'w') 140 handle.writelines(self.write()) 141 handle.close() 142