1#!/usr/bin/env python
2
3""" MultiQC modules base class, contains helper functions """
4
5from __future__ import print_function
6from collections import OrderedDict
7import io
8import fnmatch
9import logging
10import markdown
11import mimetypes
12import os
13import re
14import textwrap
15
16from multiqc.utils import report, config, util_functions
17
18logger = logging.getLogger(__name__)
19
20
21class BaseMultiqcModule(object):
22    def __init__(
23        self,
24        name="base",
25        anchor="base",
26        target=None,
27        href=None,
28        info=None,
29        comment=None,
30        extra=None,
31        autoformat=True,
32        autoformat_type="markdown",
33    ):
34
35        # Custom options from user config that can overwrite base module values
36        mod_cust_config = getattr(self, "mod_cust_config", {})
37        self.name = mod_cust_config.get("name", name)
38        self.anchor = mod_cust_config.get("anchor", anchor)
39        target = mod_cust_config.get("target", target)
40        self.href = mod_cust_config.get("href", href)
41        self.info = mod_cust_config.get("info", info)
42        self.comment = mod_cust_config.get("comment", comment)
43        self.extra = mod_cust_config.get("extra", extra)
44        # Specific module level config to overwrite (e.g. config.bcftools, config.fastqc)
45        config.update({anchor: mod_cust_config.get("custom_config", {})})
46
47        # Sanitise anchor ID and check for duplicates
48        self.anchor = report.save_htmlid(self.anchor)
49
50        # See if we have a user comment in the config
51        if self.anchor in config.section_comments:
52            self.comment = config.section_comments[self.anchor]
53
54        if self.info is None:
55            self.info = ""
56        if self.extra is None:
57            self.extra = ""
58        if target is None:
59            target = self.name
60        if self.href is not None:
61            self.mname = '<a href="{}" target="_blank">{}</a>'.format(self.href, target)
62        else:
63            self.mname = target
64        if self.href or self.info or self.extra:
65            self.intro = "<p>{} {}</p>{}".format(self.mname, self.info, self.extra)
66
67        # Format the markdown strings
68        if autoformat:
69            if self.comment is not None:
70                self.comment = textwrap.dedent(self.comment)
71                if autoformat_type == "markdown":
72                    self.comment = markdown.markdown(self.comment)
73
74        self.sections = list()
75
76    def find_log_files(self, sp_key, filecontents=True, filehandles=False):
77        """
78        Return matches log files of interest.
79        :param sp_key: Search pattern key specified in config
80        :param filehandles: Set to true to return a file handle instead of slurped file contents
81        :return: Yields a dict with filename (fn), root directory (root), cleaned sample name
82                 generated from the filename (s_name) and either the file contents or file handle
83                 for the current matched file (f).
84                 As yield is used, the results can be iterated over without loading all files at once
85        """
86
87        # Pick up path filters if specified.
88        # Allows modules to be called multiple times with different sets of files
89        path_filters = getattr(self, "mod_cust_config", {}).get("path_filters")
90        path_filters_exclude = getattr(self, "mod_cust_config", {}).get("path_filters_exclude")
91
92        # Old, depreciated syntax support. Likely to be removed in a future version.
93        if isinstance(sp_key, dict):
94            report.files[self.name] = list()
95            for sf in report.searchfiles:
96                if report.search_file(sp_key, {"fn": sf[0], "root": sf[1]}, module_key=None):
97                    report.files[self.name].append({"fn": sf[0], "root": sf[1]})
98            sp_key = self.name
99            logwarn = "Depreciation Warning: {} - Please use new style for find_log_files()".format(self.name)
100            if len(report.files[self.name]) > 0:
101                logger.warning(logwarn)
102            else:
103                logger.debug(logwarn)
104        elif not isinstance(sp_key, str):
105            logger.warning("Did not understand find_log_files() search key")
106            return
107
108        for f in report.files[sp_key]:
109            # Make a note of the filename so that we can report it if something crashes
110            report.last_found_file = os.path.join(f["root"], f["fn"])
111
112            # Filter out files based on exclusion patterns
113            if path_filters_exclude and len(path_filters_exclude) > 0:
114                exlusion_hits = (fnmatch.fnmatch(report.last_found_file, pfe) for pfe in path_filters_exclude)
115                if any(exlusion_hits):
116                    logger.debug(
117                        "{} - Skipping '{}' as it matched the path_filters_exclude for '{}'".format(
118                            sp_key, f["fn"], self.name
119                        )
120                    )
121                    continue
122
123            # Filter out files based on inclusion patterns
124            if path_filters and len(path_filters) > 0:
125                inclusion_hits = (fnmatch.fnmatch(report.last_found_file, pf) for pf in path_filters)
126                if not any(inclusion_hits):
127                    logger.debug(
128                        "{} - Skipping '{}' as it didn't match the path_filters for '{}'".format(
129                            sp_key, f["fn"], self.name
130                        )
131                    )
132                    continue
133                else:
134                    logger.debug(
135                        "{} - Selecting '{}' as it matched the path_filters for '{}'".format(sp_key, f["fn"], self.name)
136                    )
137
138            # Make a sample name from the filename
139            f["s_name"] = self.clean_s_name(f["fn"], f["root"])
140            if filehandles or filecontents:
141                try:
142                    # Custom content module can now handle image files
143                    (ftype, encoding) = mimetypes.guess_type(os.path.join(f["root"], f["fn"]))
144                    if ftype is not None and ftype.startswith("image"):
145                        with io.open(os.path.join(f["root"], f["fn"]), "rb") as fh:
146                            # always return file handles
147                            f["f"] = fh
148                            yield f
149                    else:
150                        # Everything else - should be all text files
151                        with io.open(os.path.join(f["root"], f["fn"]), "r", encoding="utf-8") as fh:
152                            if filehandles:
153                                f["f"] = fh
154                                yield f
155                            elif filecontents:
156                                f["f"] = fh.read()
157                                yield f
158                except (IOError, OSError, ValueError, UnicodeDecodeError) as e:
159                    if config.report_readerrors:
160                        logger.debug("Couldn't open filehandle when returning file: {}\n{}".format(f["fn"], e))
161                        f["f"] = None
162            else:
163                yield f
164
165    def add_section(
166        self,
167        name=None,
168        anchor=None,
169        description="",
170        comment="",
171        helptext="",
172        plot="",
173        content="",
174        autoformat=True,
175        autoformat_type="markdown",
176    ):
177        """ Add a section to the module report output """
178
179        # Default anchor
180        if anchor is None:
181            if name is not None:
182                nid = name.lower().strip().replace(" ", "-")
183                anchor = "{}-{}".format(self.anchor, nid)
184            else:
185                sl = len(self.sections) + 1
186                anchor = "{}-section-{}".format(self.anchor, sl)
187
188        # Append custom module anchor to the section if set
189        mod_cust_config = getattr(self, "mod_cust_config", {})
190        if "anchor" in mod_cust_config:
191            anchor = "{}_{}".format(mod_cust_config["anchor"], anchor)
192
193        # Sanitise anchor ID and check for duplicates
194        anchor = report.save_htmlid(anchor)
195
196        # Skip if user has a config to remove this module section
197        if anchor in config.remove_sections:
198            logger.debug("Skipping section '{}' because specified in user config".format(anchor))
199            return
200
201        # See if we have a user comment in the config
202        if anchor in config.section_comments:
203            comment = config.section_comments[anchor]
204
205        # Format the content
206        if autoformat:
207            if len(description) > 0:
208                description = textwrap.dedent(description)
209                if autoformat_type == "markdown":
210                    description = markdown.markdown(description)
211            if len(comment) > 0:
212                comment = textwrap.dedent(comment)
213                if autoformat_type == "markdown":
214                    comment = markdown.markdown(comment)
215            if len(helptext) > 0:
216                helptext = textwrap.dedent(helptext)
217                if autoformat_type == "markdown":
218                    helptext = markdown.markdown(helptext)
219
220        # Strip excess whitespace
221        description = description.strip()
222        comment = comment.strip()
223        helptext = helptext.strip()
224
225        self.sections.append(
226            {
227                "name": name,
228                "anchor": anchor,
229                "description": description,
230                "comment": comment,
231                "helptext": helptext,
232                "plot": plot,
233                "content": content,
234                "print_section": any(
235                    [n is not None and len(n) > 0 for n in [description, comment, helptext, plot, content]]
236                ),
237            }
238        )
239
240    def clean_s_name(self, s_name, root):
241        """Helper function to take a long file name and strip it
242        back to a clean sample name. Somewhat arbitrary.
243        :param s_name: The sample name to clean
244        :param root: The directory path that this file is within
245        :config.prepend_dirs: boolean, whether to prepend dir name to s_name
246        :return: The cleaned sample name, ready to be used
247        """
248        s_name_original = s_name
249        if root is None:
250            root = ""
251
252        # if s_name comes from file contents, it may have a file path
253        # For consistency with other modules, we keep just the basename
254        s_name = os.path.basename(s_name)
255
256        if config.fn_clean_sample_names:
257            # Split then take first section to remove everything after these matches
258            for ext in config.fn_clean_exts:
259                # Check if this config is limited to a module
260                if "module" in ext:
261                    if type(ext["module"]) is str:
262                        ext["module"] = [ext["module"]]
263                    if not any([m == self.anchor for m in ext["module"]]):
264                        continue
265
266                # Go through different filter types
267                if type(ext) is str:
268                    ext = {"type": "truncate", "pattern": ext}
269                if ext.get("type") == "truncate":
270                    s_name = s_name.split(ext["pattern"], 1)[0]
271                elif ext.get("type") in ("remove", "replace"):
272                    if ext["type"] == "replace":
273                        logger.warning(
274                            "use 'config.fn_clean_sample_names.remove' instead "
275                            "of 'config.fn_clean_sample_names.replace' [deprecated]"
276                        )
277                    s_name = s_name.replace(ext["pattern"], "")
278                elif ext.get("type") == "regex":
279                    s_name = re.sub(ext["pattern"], "", s_name)
280                elif ext.get("type") == "regex_keep":
281                    match = re.search(ext["pattern"], s_name)
282                    s_name = match.group() if match else s_name
283                elif ext.get("type") is None:
284                    logger.error('config.fn_clean_exts config was missing "type" key: {}'.format(ext))
285                else:
286                    logger.error("Unrecognised config.fn_clean_exts type: {}".format(ext.get("type")))
287            # Trim off characters at the end of names
288            for chrs in config.fn_clean_trim:
289                if s_name.endswith(chrs):
290                    s_name = s_name[: -len(chrs)]
291                if s_name.startswith(chrs):
292                    s_name = s_name[len(chrs) :]
293
294        # Prepend sample name with directory
295        if config.prepend_dirs:
296            sep = config.prepend_dirs_sep
297            root = root.lstrip(".{}".format(os.sep))
298            dirs = [d.strip() for d in root.split(os.sep) if d.strip() != ""]
299            if config.prepend_dirs_depth != 0:
300                d_idx = config.prepend_dirs_depth * -1
301                if config.prepend_dirs_depth > 0:
302                    dirs = dirs[d_idx:]
303                else:
304                    dirs = dirs[:d_idx]
305            if len(dirs) > 0:
306                s_name = "{}{}{}".format(sep.join(dirs), sep, s_name)
307
308        # Remove trailing whitespace
309        s_name = s_name.strip()
310        if s_name == "":
311            s_name = s_name_original
312
313        return s_name
314
315    def ignore_samples(self, data):
316        """ Strip out samples which match `sample_names_ignore` """
317        try:
318            if isinstance(data, OrderedDict):
319                newdata = OrderedDict()
320            elif isinstance(data, dict):
321                newdata = dict()
322            else:
323                return data
324            for s_name, v in data.items():
325                if not self.is_ignore_sample(s_name):
326                    newdata[s_name] = v
327            return newdata
328        except (TypeError, AttributeError):
329            return data
330
331    def is_ignore_sample(self, s_name):
332        """ Should a sample name be ignored? """
333        glob_match = any(fnmatch.fnmatch(s_name, sn) for sn in config.sample_names_ignore)
334        re_match = any(re.match(sn, s_name) for sn in config.sample_names_ignore_re)
335        return glob_match or re_match
336
337    def general_stats_addcols(self, data, headers=None, namespace=None):
338        """Helper function to add to the General Statistics variable.
339        Adds to report.general_stats and does not return anything. Fills
340        in required config variables if not supplied.
341        :param data: A dict with the data. First key should be sample name,
342                     then the data key, then the data.
343        :param headers: Dict / OrderedDict with information for the headers,
344                        such as colour scales, min and max values etc.
345                        See docs/writing_python.md for more information.
346        :return: None
347        """
348        if headers is None:
349            headers = {}
350        # Use the module namespace as the name if not supplied
351        if namespace is None:
352            namespace = self.name
353
354        # Guess the column headers from the data if not supplied
355        if headers is None or len(headers) == 0:
356            hs = set()
357            for d in data.values():
358                hs.update(d.keys())
359            hs = list(hs)
360            hs.sort()
361            headers = OrderedDict()
362            for k in hs:
363                headers[k] = dict()
364
365        # Add the module name to the description if not already done
366        keys = headers.keys()
367        for k in keys:
368            if "namespace" not in headers[k]:
369                headers[k]["namespace"] = namespace
370            if "description" not in headers[k]:
371                headers[k]["description"] = headers[k].get("title", k)
372
373        # Append to report.general_stats for later assembly into table
374        report.general_stats_data.append(data)
375        report.general_stats_headers.append(headers)
376
377    def add_data_source(self, f=None, s_name=None, source=None, module=None, section=None):
378        try:
379            if module is None:
380                module = self.name
381            if section is None:
382                section = "all_sections"
383            if s_name is None:
384                s_name = f["s_name"]
385            if source is None:
386                source = os.path.abspath(os.path.join(f["root"], f["fn"]))
387            report.data_sources[module][section][s_name] = source
388        except AttributeError:
389            logger.warning("Tried to add data source for {}, but was missing fields data".format(self.name))
390
391    def write_data_file(self, data, fn, sort_cols=False, data_format=None):
392        """Saves raw data to a dictionary for downstream use, then redirects
393        to report.write_data_file() to create the file in the report directory"""
394
395        # Append custom module anchor if set
396        mod_cust_config = getattr(self, "mod_cust_config", {})
397        if "anchor" in mod_cust_config:
398            fn = "{}_{}".format(fn, mod_cust_config["anchor"])
399
400        # Generate a unique filename if the file already exists (running module multiple times)
401        i = 1
402        base_fn = fn
403        while fn in report.saved_raw_data:
404            fn = "{}_{}".format(base_fn, i)
405            i += 1
406
407        # Save the file
408        report.saved_raw_data[fn] = data
409        util_functions.write_data_file(data, fn, sort_cols, data_format)
410
411    ##################################################
412    #### DEPRECATED FORWARDERS
413    def plot_bargraph(self, data, cats=None, pconfig=None):
414        """ Depreciated function. Forwards to new location. """
415        from multiqc.plots import bargraph
416
417        if pconfig is None:
418            pconfig = {}
419        return bargraph.plot(data, cats, pconfig)
420
421    def plot_xy_data(self, data, pconfig=None):
422        """ Depreciated function. Forwards to new location. """
423        from multiqc.plots import linegraph
424
425        if pconfig is None:
426            pconfig = {}
427        return linegraph.plot(data, pconfig)
428