1import h5py 2 3from ont_fast5_api import CURRENT_FAST5_VERSION 4from ont_fast5_api.fast5_file import Fast5File, Fast5FileTypeError 5from ont_fast5_api.fast5_read import AbstractFast5, Fast5Read 6from ont_fast5_api.static_data import HARDLINK_GROUPS, OPTIONAL_READ_GROUPS 7 8 9class MultiFast5File(AbstractFast5): 10 def __init__(self, filename, mode='r', driver=None): 11 # See https://docs.h5py.org/en/stable/high/file.html#file-drivers for 12 # information on why you might want to use a specific driver. 13 self.filename = filename 14 self.mode = mode 15 self.handle = h5py.File(self.filename, self.mode, driver=driver) 16 self._run_id_map = None 17 if mode != 'r' and 'file_version' not in self.handle.attrs: 18 try: 19 self.handle.attrs['file_version'] = str(CURRENT_FAST5_VERSION) 20 except IOError as e: 21 raise IOError("Could not write 'file_version' in mode '{}' to fast5 file: {}" 22 "".format(self.filename, self.mode)) from e 23 24 def get_reads(self): 25 for group_name in self.handle: 26 if group_name.startswith('read_'): 27 yield Fast5Read(self, group_name[5:]) 28 29 def get_read_ids(self): 30 # Return all groups with the 'read_' stripped from the front 31 return [group_name[5:] for group_name in self.handle if group_name.startswith('read_')] 32 33 def get_read(self, read_id): 34 group_name = "read_" + read_id 35 if group_name not in self.handle: 36 raise KeyError("Read '{}' not in: {}".format(group_name, self.filename)) 37 return Fast5Read(self, read_id) 38 39 def create_read(self, read_id, run_id): 40 DeprecationWarning("'MultiFast5File.create_read()' will be deprecated. " 41 "Use `MultiFast5File.create_empty_read()` instead") 42 return self.create_empty_read(read_id, run_id) 43 44 def create_empty_read(self, read_id, run_id): 45 group_name = "read_" + read_id 46 if group_name not in self.handle: 47 try: 48 self.handle.create_group(group_name) 49 except ValueError as e: 50 raise ValueError("Could not create group '{}' in file: {}".format(group_name, self.filename)) from e 51 try: 52 for shared_group in HARDLINK_GROUPS: 53 self.handle["{}/{}".format(group_name, shared_group)] = \ 54 self.handle["read_{}/{}".format(self.run_id_map[run_id], shared_group)] 55 except KeyError: 56 # If we can't hardlink to existing groups then continue as normal 57 # registering this read as the new source of metadata for this run_id_map 58 self.run_id_map[run_id] = read_id 59 self.handle[group_name].attrs["run_id"] = run_id 60 else: 61 raise ValueError("Read '{}' already exists in: {}".format(group_name, self.filename)) 62 return Fast5Read(self, read_id) 63 64 65 @property 66 def run_id_map(self): 67 if self._run_id_map is None: 68 self._run_id_map = dict() 69 for read in self.get_reads(): 70 try: 71 self._run_id_map[read.run_id] = read.read_id 72 except KeyError: 73 # If we can't find the read.run_id then there is a KeyError 74 # We want to ignore these cases since they can't be linked anyway 75 pass 76 return self._run_id_map 77 78 def add_existing_read(self, read_to_add, target_compression=None, sanitize=False): 79 if isinstance(read_to_add, Fast5File): 80 self._add_read_from_single(read_to_add, target_compression, sanitize=sanitize) 81 elif isinstance(read_to_add.parent, MultiFast5File): 82 self._add_read_from_multi(read_to_add, target_compression, sanitize=sanitize) 83 else: 84 raise Fast5FileTypeError("Could not add read to output file from input file type '{}' with path '{}'" 85 "".format(type(read_to_add.parent), read_to_add.parent.filename)) 86 87 def _add_read_from_multi(self, read_to_add, target_compression, sanitize=False): 88 read_name = "read_" + read_to_add.read_id 89 self.handle.create_group(read_name) 90 output_group = self.handle[read_name] 91 copy_attributes(read_to_add.handle.attrs, output_group) 92 for subgroup in read_to_add.handle: 93 if sanitize and subgroup in OPTIONAL_READ_GROUPS: 94 # skip optional groups when sanitizing 95 continue 96 elif subgroup == read_to_add.raw_dataset_group_name \ 97 and target_compression is not None \ 98 and str(target_compression) not in read_to_add.raw_compression_filters: 99 raw_attrs = read_to_add.handle[read_to_add.raw_dataset_group_name].attrs 100 raw_data = read_to_add.handle[read_to_add.raw_dataset_name] 101 output_read = self.get_read(read_to_add.read_id) 102 output_read.add_raw_data(raw_data, raw_attrs, compression=target_compression) 103 continue 104 elif subgroup in HARDLINK_GROUPS: 105 if read_to_add.run_id in self.run_id_map: 106 # There may be a group to link to, but we must check it actually exists! 107 hardlink_source = "read_{}/{}".format(self.run_id_map[read_to_add.run_id], subgroup) 108 if hardlink_source in self.handle: 109 hardlink_dest = "read_{}/{}".format(read_to_add.read_id, subgroup) 110 self.handle[hardlink_dest] = self.handle[hardlink_source] 111 continue 112 # If we couldn't hardlink to anything we need to make the group we create available for future reads 113 self.run_id_map[read_to_add.run_id] = read_to_add.read_id 114 # If we haven't done a special-case copy then we can fall back on the default copy 115 output_group.copy(read_to_add.handle[subgroup], subgroup) 116 117 def _add_read_from_single(self, read_to_add, target_compression, sanitize=False): 118 read_name = "read_" + read_to_add.read_id 119 self.handle.create_group(read_name) 120 output_group = self.handle[read_name] 121 copy_attributes(read_to_add.handle.attrs, output_group) 122 for subgroup in read_to_add.handle: 123 if sanitize and subgroup in OPTIONAL_READ_GROUPS: 124 # skip optional groups when sanitizing 125 continue 126 elif subgroup == "UniqueGlobalKey": 127 for unique_group in read_to_add.handle["UniqueGlobalKey"]: 128 if unique_group in HARDLINK_GROUPS and read_to_add.run_id in self.run_id_map: 129 hardlink_source = "read_{}/{}".format(self.run_id_map[read_to_add.run_id], unique_group) 130 if hardlink_source in self.handle: 131 hardlink_dest = "read_{}/{}".format(read_to_add.read_id, unique_group) 132 self.handle[hardlink_dest] = self.handle[hardlink_source] 133 else: 134 output_group.copy(read_to_add.handle["UniqueGlobalKey/{}".format(unique_group)], 135 unique_group) 136 self.run_id_map[read_to_add.run_id] = read_to_add.read_id 137 elif subgroup == "Raw": 138 if target_compression is None or str(target_compression) in read_to_add.raw_compression_filters: 139 output_group.copy(read_to_add.handle[read_to_add.raw_dataset_group_name], "Raw") 140 else: 141 raw_attrs = read_to_add.handle[read_to_add.raw_dataset_group_name].attrs 142 raw_data = read_to_add.handle[read_to_add.raw_dataset_name] 143 output_read = self.get_read(read_to_add.read_id) 144 output_read.add_raw_data(raw_data, raw_attrs, compression=target_compression) 145 else: 146 if not sanitize: 147 output_group.copy(read_to_add.handle[subgroup], subgroup) 148 149 150def copy_attributes(input_attrs, output_group): 151 for k, v in input_attrs.items(): 152 output_group.attrs[k] = v 153