1#!/usr/bin/env python 2 3""" MultiQC modules base class, contains helper functions """ 4 5from __future__ import print_function 6from collections import OrderedDict 7import io 8import fnmatch 9import logging 10import markdown 11import mimetypes 12import os 13import re 14import textwrap 15 16from multiqc.utils import report, config, util_functions 17 18logger = logging.getLogger(__name__) 19 20 21class BaseMultiqcModule(object): 22 def __init__( 23 self, 24 name="base", 25 anchor="base", 26 target=None, 27 href=None, 28 info=None, 29 comment=None, 30 extra=None, 31 autoformat=True, 32 autoformat_type="markdown", 33 ): 34 35 # Custom options from user config that can overwrite base module values 36 mod_cust_config = getattr(self, "mod_cust_config", {}) 37 self.name = mod_cust_config.get("name", name) 38 self.anchor = mod_cust_config.get("anchor", anchor) 39 target = mod_cust_config.get("target", target) 40 self.href = mod_cust_config.get("href", href) 41 self.info = mod_cust_config.get("info", info) 42 self.comment = mod_cust_config.get("comment", comment) 43 self.extra = mod_cust_config.get("extra", extra) 44 # Specific module level config to overwrite (e.g. config.bcftools, config.fastqc) 45 config.update({anchor: mod_cust_config.get("custom_config", {})}) 46 47 # Sanitise anchor ID and check for duplicates 48 self.anchor = report.save_htmlid(self.anchor) 49 50 # See if we have a user comment in the config 51 if self.anchor in config.section_comments: 52 self.comment = config.section_comments[self.anchor] 53 54 if self.info is None: 55 self.info = "" 56 if self.extra is None: 57 self.extra = "" 58 if target is None: 59 target = self.name 60 if self.href is not None: 61 self.mname = '<a href="{}" target="_blank">{}</a>'.format(self.href, target) 62 else: 63 self.mname = target 64 if self.href or self.info or self.extra: 65 self.intro = "<p>{} {}</p>{}".format(self.mname, self.info, self.extra) 66 67 # Format the markdown strings 68 if autoformat: 69 if self.comment is not None: 70 self.comment = textwrap.dedent(self.comment) 71 if autoformat_type == "markdown": 72 self.comment = markdown.markdown(self.comment) 73 74 self.sections = list() 75 76 def find_log_files(self, sp_key, filecontents=True, filehandles=False): 77 """ 78 Return matches log files of interest. 79 :param sp_key: Search pattern key specified in config 80 :param filehandles: Set to true to return a file handle instead of slurped file contents 81 :return: Yields a dict with filename (fn), root directory (root), cleaned sample name 82 generated from the filename (s_name) and either the file contents or file handle 83 for the current matched file (f). 84 As yield is used, the results can be iterated over without loading all files at once 85 """ 86 87 # Pick up path filters if specified. 88 # Allows modules to be called multiple times with different sets of files 89 path_filters = getattr(self, "mod_cust_config", {}).get("path_filters") 90 path_filters_exclude = getattr(self, "mod_cust_config", {}).get("path_filters_exclude") 91 92 # Old, depreciated syntax support. Likely to be removed in a future version. 93 if isinstance(sp_key, dict): 94 report.files[self.name] = list() 95 for sf in report.searchfiles: 96 if report.search_file(sp_key, {"fn": sf[0], "root": sf[1]}, module_key=None): 97 report.files[self.name].append({"fn": sf[0], "root": sf[1]}) 98 sp_key = self.name 99 logwarn = "Depreciation Warning: {} - Please use new style for find_log_files()".format(self.name) 100 if len(report.files[self.name]) > 0: 101 logger.warning(logwarn) 102 else: 103 logger.debug(logwarn) 104 elif not isinstance(sp_key, str): 105 logger.warning("Did not understand find_log_files() search key") 106 return 107 108 for f in report.files[sp_key]: 109 # Make a note of the filename so that we can report it if something crashes 110 report.last_found_file = os.path.join(f["root"], f["fn"]) 111 112 # Filter out files based on exclusion patterns 113 if path_filters_exclude and len(path_filters_exclude) > 0: 114 exlusion_hits = (fnmatch.fnmatch(report.last_found_file, pfe) for pfe in path_filters_exclude) 115 if any(exlusion_hits): 116 logger.debug( 117 "{} - Skipping '{}' as it matched the path_filters_exclude for '{}'".format( 118 sp_key, f["fn"], self.name 119 ) 120 ) 121 continue 122 123 # Filter out files based on inclusion patterns 124 if path_filters and len(path_filters) > 0: 125 inclusion_hits = (fnmatch.fnmatch(report.last_found_file, pf) for pf in path_filters) 126 if not any(inclusion_hits): 127 logger.debug( 128 "{} - Skipping '{}' as it didn't match the path_filters for '{}'".format( 129 sp_key, f["fn"], self.name 130 ) 131 ) 132 continue 133 else: 134 logger.debug( 135 "{} - Selecting '{}' as it matched the path_filters for '{}'".format(sp_key, f["fn"], self.name) 136 ) 137 138 # Make a sample name from the filename 139 f["s_name"] = self.clean_s_name(f["fn"], f["root"]) 140 if filehandles or filecontents: 141 try: 142 # Custom content module can now handle image files 143 (ftype, encoding) = mimetypes.guess_type(os.path.join(f["root"], f["fn"])) 144 if ftype is not None and ftype.startswith("image"): 145 with io.open(os.path.join(f["root"], f["fn"]), "rb") as fh: 146 # always return file handles 147 f["f"] = fh 148 yield f 149 else: 150 # Everything else - should be all text files 151 with io.open(os.path.join(f["root"], f["fn"]), "r", encoding="utf-8") as fh: 152 if filehandles: 153 f["f"] = fh 154 yield f 155 elif filecontents: 156 f["f"] = fh.read() 157 yield f 158 except (IOError, OSError, ValueError, UnicodeDecodeError) as e: 159 if config.report_readerrors: 160 logger.debug("Couldn't open filehandle when returning file: {}\n{}".format(f["fn"], e)) 161 f["f"] = None 162 else: 163 yield f 164 165 def add_section( 166 self, 167 name=None, 168 anchor=None, 169 description="", 170 comment="", 171 helptext="", 172 plot="", 173 content="", 174 autoformat=True, 175 autoformat_type="markdown", 176 ): 177 """ Add a section to the module report output """ 178 179 # Default anchor 180 if anchor is None: 181 if name is not None: 182 nid = name.lower().strip().replace(" ", "-") 183 anchor = "{}-{}".format(self.anchor, nid) 184 else: 185 sl = len(self.sections) + 1 186 anchor = "{}-section-{}".format(self.anchor, sl) 187 188 # Append custom module anchor to the section if set 189 mod_cust_config = getattr(self, "mod_cust_config", {}) 190 if "anchor" in mod_cust_config: 191 anchor = "{}_{}".format(mod_cust_config["anchor"], anchor) 192 193 # Sanitise anchor ID and check for duplicates 194 anchor = report.save_htmlid(anchor) 195 196 # Skip if user has a config to remove this module section 197 if anchor in config.remove_sections: 198 logger.debug("Skipping section '{}' because specified in user config".format(anchor)) 199 return 200 201 # See if we have a user comment in the config 202 if anchor in config.section_comments: 203 comment = config.section_comments[anchor] 204 205 # Format the content 206 if autoformat: 207 if len(description) > 0: 208 description = textwrap.dedent(description) 209 if autoformat_type == "markdown": 210 description = markdown.markdown(description) 211 if len(comment) > 0: 212 comment = textwrap.dedent(comment) 213 if autoformat_type == "markdown": 214 comment = markdown.markdown(comment) 215 if len(helptext) > 0: 216 helptext = textwrap.dedent(helptext) 217 if autoformat_type == "markdown": 218 helptext = markdown.markdown(helptext) 219 220 # Strip excess whitespace 221 description = description.strip() 222 comment = comment.strip() 223 helptext = helptext.strip() 224 225 self.sections.append( 226 { 227 "name": name, 228 "anchor": anchor, 229 "description": description, 230 "comment": comment, 231 "helptext": helptext, 232 "plot": plot, 233 "content": content, 234 "print_section": any( 235 [n is not None and len(n) > 0 for n in [description, comment, helptext, plot, content]] 236 ), 237 } 238 ) 239 240 def clean_s_name(self, s_name, root): 241 """Helper function to take a long file name and strip it 242 back to a clean sample name. Somewhat arbitrary. 243 :param s_name: The sample name to clean 244 :param root: The directory path that this file is within 245 :config.prepend_dirs: boolean, whether to prepend dir name to s_name 246 :return: The cleaned sample name, ready to be used 247 """ 248 s_name_original = s_name 249 if root is None: 250 root = "" 251 252 # if s_name comes from file contents, it may have a file path 253 # For consistency with other modules, we keep just the basename 254 s_name = os.path.basename(s_name) 255 256 if config.fn_clean_sample_names: 257 # Split then take first section to remove everything after these matches 258 for ext in config.fn_clean_exts: 259 # Check if this config is limited to a module 260 if "module" in ext: 261 if type(ext["module"]) is str: 262 ext["module"] = [ext["module"]] 263 if not any([m == self.anchor for m in ext["module"]]): 264 continue 265 266 # Go through different filter types 267 if type(ext) is str: 268 ext = {"type": "truncate", "pattern": ext} 269 if ext.get("type") == "truncate": 270 s_name = s_name.split(ext["pattern"], 1)[0] 271 elif ext.get("type") in ("remove", "replace"): 272 if ext["type"] == "replace": 273 logger.warning( 274 "use 'config.fn_clean_sample_names.remove' instead " 275 "of 'config.fn_clean_sample_names.replace' [deprecated]" 276 ) 277 s_name = s_name.replace(ext["pattern"], "") 278 elif ext.get("type") == "regex": 279 s_name = re.sub(ext["pattern"], "", s_name) 280 elif ext.get("type") == "regex_keep": 281 match = re.search(ext["pattern"], s_name) 282 s_name = match.group() if match else s_name 283 elif ext.get("type") is None: 284 logger.error('config.fn_clean_exts config was missing "type" key: {}'.format(ext)) 285 else: 286 logger.error("Unrecognised config.fn_clean_exts type: {}".format(ext.get("type"))) 287 # Trim off characters at the end of names 288 for chrs in config.fn_clean_trim: 289 if s_name.endswith(chrs): 290 s_name = s_name[: -len(chrs)] 291 if s_name.startswith(chrs): 292 s_name = s_name[len(chrs) :] 293 294 # Prepend sample name with directory 295 if config.prepend_dirs: 296 sep = config.prepend_dirs_sep 297 root = root.lstrip(".{}".format(os.sep)) 298 dirs = [d.strip() for d in root.split(os.sep) if d.strip() != ""] 299 if config.prepend_dirs_depth != 0: 300 d_idx = config.prepend_dirs_depth * -1 301 if config.prepend_dirs_depth > 0: 302 dirs = dirs[d_idx:] 303 else: 304 dirs = dirs[:d_idx] 305 if len(dirs) > 0: 306 s_name = "{}{}{}".format(sep.join(dirs), sep, s_name) 307 308 # Remove trailing whitespace 309 s_name = s_name.strip() 310 if s_name == "": 311 s_name = s_name_original 312 313 return s_name 314 315 def ignore_samples(self, data): 316 """ Strip out samples which match `sample_names_ignore` """ 317 try: 318 if isinstance(data, OrderedDict): 319 newdata = OrderedDict() 320 elif isinstance(data, dict): 321 newdata = dict() 322 else: 323 return data 324 for s_name, v in data.items(): 325 if not self.is_ignore_sample(s_name): 326 newdata[s_name] = v 327 return newdata 328 except (TypeError, AttributeError): 329 return data 330 331 def is_ignore_sample(self, s_name): 332 """ Should a sample name be ignored? """ 333 glob_match = any(fnmatch.fnmatch(s_name, sn) for sn in config.sample_names_ignore) 334 re_match = any(re.match(sn, s_name) for sn in config.sample_names_ignore_re) 335 return glob_match or re_match 336 337 def general_stats_addcols(self, data, headers=None, namespace=None): 338 """Helper function to add to the General Statistics variable. 339 Adds to report.general_stats and does not return anything. Fills 340 in required config variables if not supplied. 341 :param data: A dict with the data. First key should be sample name, 342 then the data key, then the data. 343 :param headers: Dict / OrderedDict with information for the headers, 344 such as colour scales, min and max values etc. 345 See docs/writing_python.md for more information. 346 :return: None 347 """ 348 if headers is None: 349 headers = {} 350 # Use the module namespace as the name if not supplied 351 if namespace is None: 352 namespace = self.name 353 354 # Guess the column headers from the data if not supplied 355 if headers is None or len(headers) == 0: 356 hs = set() 357 for d in data.values(): 358 hs.update(d.keys()) 359 hs = list(hs) 360 hs.sort() 361 headers = OrderedDict() 362 for k in hs: 363 headers[k] = dict() 364 365 # Add the module name to the description if not already done 366 keys = headers.keys() 367 for k in keys: 368 if "namespace" not in headers[k]: 369 headers[k]["namespace"] = namespace 370 if "description" not in headers[k]: 371 headers[k]["description"] = headers[k].get("title", k) 372 373 # Append to report.general_stats for later assembly into table 374 report.general_stats_data.append(data) 375 report.general_stats_headers.append(headers) 376 377 def add_data_source(self, f=None, s_name=None, source=None, module=None, section=None): 378 try: 379 if module is None: 380 module = self.name 381 if section is None: 382 section = "all_sections" 383 if s_name is None: 384 s_name = f["s_name"] 385 if source is None: 386 source = os.path.abspath(os.path.join(f["root"], f["fn"])) 387 report.data_sources[module][section][s_name] = source 388 except AttributeError: 389 logger.warning("Tried to add data source for {}, but was missing fields data".format(self.name)) 390 391 def write_data_file(self, data, fn, sort_cols=False, data_format=None): 392 """Saves raw data to a dictionary for downstream use, then redirects 393 to report.write_data_file() to create the file in the report directory""" 394 395 # Append custom module anchor if set 396 mod_cust_config = getattr(self, "mod_cust_config", {}) 397 if "anchor" in mod_cust_config: 398 fn = "{}_{}".format(fn, mod_cust_config["anchor"]) 399 400 # Generate a unique filename if the file already exists (running module multiple times) 401 i = 1 402 base_fn = fn 403 while fn in report.saved_raw_data: 404 fn = "{}_{}".format(base_fn, i) 405 i += 1 406 407 # Save the file 408 report.saved_raw_data[fn] = data 409 util_functions.write_data_file(data, fn, sort_cols, data_format) 410 411 ################################################## 412 #### DEPRECATED FORWARDERS 413 def plot_bargraph(self, data, cats=None, pconfig=None): 414 """ Depreciated function. Forwards to new location. """ 415 from multiqc.plots import bargraph 416 417 if pconfig is None: 418 pconfig = {} 419 return bargraph.plot(data, cats, pconfig) 420 421 def plot_xy_data(self, data, pconfig=None): 422 """ Depreciated function. Forwards to new location. """ 423 from multiqc.plots import linegraph 424 425 if pconfig is None: 426 pconfig = {} 427 return linegraph.plot(data, pconfig) 428