1# Copyright 2004 by Harry Zuzan. All rights reserved.
2# Copyright 2016 by Adam Kurkiewicz. All rights reserved.
3# This file is part of the Biopython distribution and governed by your
4# choice of the "Biopython License Agreement" or the "BSD 3-Clause License".
5# Please see the LICENSE file that should have been included as part of this
6# package.
7
8"""Reading information from Affymetrix CEL files version 3 and 4."""
9
10
11import struct
12
13try:
14    import numpy
15except ImportError:
16    from Bio import MissingPythonDependencyError
17
18    raise MissingPythonDependencyError(
19        "Install NumPy if you want to use Bio.Affy.CelFile"
20    ) from None
21
22
23class ParserError(ValueError):
24    """Affymetrix parser error."""
25
26    def __init__(self, *args):
27        """Initialise class."""
28        super().__init__(*args)
29
30
31class Record:
32    """Stores the information in a cel file.
33
34    Example usage:
35
36    >>> from Bio.Affy import CelFile
37    >>> with open("Affy/affy_v3_example.CEL") as handle:
38    ...     c = CelFile.read(handle)
39    ...
40    >>> print(c.ncols, c.nrows)
41    5 5
42    >>> print(c.intensities)
43    [[   234.    170.  22177.    164.  22104.]
44     [   188.    188.  21871.    168.  21883.]
45     [   188.    193.  21455.    198.  21300.]
46     [   188.    182.  21438.    188.  20945.]
47     [   193.  20370.    174.  20605.    168.]]
48    >>> print(c.stdevs)
49    [[   24.     34.5  2669.     19.7  3661.2]
50     [   29.8    29.8  2795.9    67.9  2792.4]
51     [   29.8    88.7  2976.5    62.   2914.5]
52     [   29.8    76.2  2759.5    49.2  2762. ]
53     [   38.8  2611.8    26.6  2810.7    24.1]]
54    >>> print(c.npix)
55    [[25 25 25 25 25]
56     [25 25 25 25 25]
57     [25 25 25 25 25]
58     [25 25 25 25 25]
59     [25 25 25 25 25]]
60
61    """
62
63    def __init__(self):
64        """Initialize the class."""
65        self.version = None
66        self.GridCornerUL = None
67        self.GridCornerUR = None
68        self.GridCornerLR = None
69        self.GridCornerLL = None
70        self.DatHeader = None
71        self.Algorithm = None
72        self.AlgorithmParameters = None
73        self.NumberCells = None
74        self.intensities = None
75        self.stdevs = None
76        self.npix = None
77        self.nrows = None
78        self.ncols = None
79        self.nmask = None
80        self.mask = None
81        self.noutliers = None
82        self.outliers = None
83        self.modified = None
84
85
86def read(handle, version=None):
87    """Read Affymetrix CEL file and return Record object.
88
89    CEL files format versions 3 and 4 are supported.
90    Please specify the CEL file format as 3 or 4 if known for the version
91    argument. If the version number is not specified, the parser will attempt
92    to detect the version from the file contents.
93
94    The Record object returned by this function stores the intensities from
95    the CEL file in record.intensities.
96    Currently, record.mask and record.outliers are not set in when parsing
97    version 4 CEL files.
98
99    Example Usage:
100
101    >>> from Bio.Affy import CelFile
102    >>> with open("Affy/affy_v3_example.CEL") as handle:
103    ...     record = CelFile.read(handle)
104    ...
105    >>> record.version == 3
106    True
107    >>> print("%i by %i array" % record.intensities.shape)
108    5 by 5 array
109
110    >>> with open("Affy/affy_v4_example.CEL", "rb") as handle:
111    ...     record = CelFile.read(handle, version=4)
112    ...
113    >>> record.version == 4
114    True
115    >>> print("%i by %i array" % record.intensities.shape)
116    5 by 5 array
117
118    """
119    try:
120        data = handle.read(0)
121    except AttributeError:
122        raise ValueError("handle should be a file handle") from None
123    data = handle.read(4)
124    if not data:
125        raise ValueError("Empty file.")
126    if data == b"[CEL":
127        raise ValueError("CEL file in version 3 format should be opened in text mode")
128    if data == "[CEL":
129        # Version 3 format. Continue to read the header here before passing
130        # control to _read_v3 to avoid having to seek to the beginning of
131        # the file.
132        data += next(handle)
133        if data.strip() != "[CEL]":
134            raise ValueError("Failed to parse Affy Version 3 CEL file.")
135        line = next(handle)
136        keyword, value = line.split("=", 1)
137        if keyword != "Version":
138            raise ValueError("Failed to parse Affy Version 3 CEL file.")
139        version = int(value)
140        if version != 3:
141            raise ValueError("Incorrect version number in Affy Version 3 CEL file.")
142        return _read_v3(handle)
143    try:
144        magicNumber = struct.unpack("<i", data)
145    except TypeError:
146        raise ValueError(
147            "CEL file in version 4 format should be opened in binary mode"
148        ) from None
149    except struct.error:
150        raise ValueError(
151            "Failed to read magic number from Affy Version 4 CEL file"
152        ) from None
153    if magicNumber != (64,):
154        raise ValueError("Incorrect magic number in Affy Version 4 CEL file")
155    return _read_v4(handle)
156
157
158def _read_v4(f):
159    # We follow the documentation here:
160    # http://www.affymetrix.com/estore/support/developer/powertools/changelog/gcos-agcc/cel.html.affx
161    record = Record()
162    preHeaders = ["version", "columns", "rows", "cellNo", "headerLen"]
163    preHeadersMap = {}
164    headersMap = {}
165
166    # Load pre-headers. The magic number was already parsed in the read
167    # function calling _read_v4.
168    preHeadersMap["magic"] = 64
169    try:
170        for name in preHeaders:
171            preHeadersMap[name] = struct.unpack("<i", f.read(4))[0]
172    except struct.error:
173        raise ParserError("Failed to parse CEL version 4 file") from None
174
175    char = f.read(preHeadersMap["headerLen"])
176    header = char.decode("ascii", "ignore")
177    for header in header.split("\n"):
178        if "=" in header:
179            header = header.split("=")
180            headersMap[header[0]] = "=".join(header[1:])
181
182    record.version = preHeadersMap["version"]
183    if record.version != 4:
184        raise ParserError("Incorrect version number in CEL version 4 file")
185
186    record.GridCornerUL = headersMap["GridCornerUL"]
187    record.GridCornerUR = headersMap["GridCornerUR"]
188    record.GridCornerLR = headersMap["GridCornerLR"]
189    record.GridCornerLL = headersMap["GridCornerLL"]
190    record.DatHeader = headersMap["DatHeader"]
191    record.Algorithm = headersMap["Algorithm"]
192    record.AlgorithmParameters = headersMap["AlgorithmParameters"]
193    record.NumberCells = preHeadersMap["cellNo"]
194    # record.intensities are set below
195    # record.stdevs are set below
196    # record.npix are set below
197    record.nrows = int(headersMap["Rows"])
198    record.ncols = int(headersMap["Cols"])
199
200    # These cannot be reliably set in v4, because of discrepancies between real
201    # data and the documented format.
202    record.nmask = None
203    record.mask = None
204    record.noutliers = None
205    record.outliers = None
206    record.modified = None
207
208    # Real data never seems to have anything but zeros here, but we don't want
209    # to take chances. Raising an error is better than returning unreliable
210    # data.
211    def raiseBadHeader(field, expected):
212        actual = int(headersMap[field])
213        message = f"The header {field} is expected to be 0, not {actual}"
214        if actual != expected:
215            raise ParserError(message)
216
217    raiseBadHeader("Axis-invertX", 0)
218
219    raiseBadHeader("AxisInvertY", 0)
220
221    raiseBadHeader("OffsetX", 0)
222
223    raiseBadHeader("OffsetY", 0)
224
225    # This is unfortunately undocumented, but it turns out that real data has
226    # the record.AlgorithmParameters repeated in the data section, until an
227    # EOF, i.e. b"\x04".
228    char = b"\x00"
229    safetyValve = 10 ** 4
230    for i in range(safetyValve):
231        char = f.read(1)
232        # For debugging
233        # print([i for i in char], end="")
234        if char == b"\x04":
235            break
236        if i == safetyValve:
237            raise ParserError(
238                "Parse Error. The parser expects a short, "
239                "undocumented binary blob terminating with "
240                "ASCII EOF, x04"
241            )
242
243    # After that there are precisely 15 bytes padded. Again, undocumented.
244    padding = f.read(15)
245
246    # That's how we pull out the values (triplets of the form float, float,
247    # signed short).
248    structa = struct.Struct("< f f h")
249
250    # There are 10 bytes in our struct.
251    structSize = 10
252
253    # We initialize the most important: intensities, stdevs and npixs.
254    record.intensities = numpy.empty(record.NumberCells, dtype=float)
255    record.stdevs = numpy.empty(record.NumberCells, dtype=float)
256    record.npix = numpy.empty(record.NumberCells, dtype=int)
257
258    b = f.read(structSize * record.NumberCells)
259    for i in range(record.NumberCells):
260        binaryFragment = b[i * structSize : (i + 1) * structSize]
261        intensity, stdevs, npix = structa.unpack(binaryFragment)
262        record.intensities[i] = intensity
263        record.stdevs[i] = stdevs
264        record.npix[i] = npix
265
266    # reshape without copying.
267    def reshape(array):
268        view = array.view()
269        view.shape = (record.nrows, record.ncols)
270        return view
271
272    record.intensities = reshape(record.intensities)
273    record.stdevs = reshape(record.stdevs)
274    record.npix = reshape(record.npix)
275
276    return record
277
278
279def _read_v3(handle):
280    # Needs error handling.
281    # Needs to know the chip design.
282    record = Record()
283    # The version number was already obtained when the read function calling
284    # _read_v3 parsed the CEL section.
285    record.version = 3
286    section = ""
287    for line in handle:
288        line = line.rstrip("\r\n")
289        if not line:
290            continue
291        # Set current section
292        if line.startswith("[HEADER]"):
293            section = "HEADER"
294        elif line.startswith("[INTENSITY]"):
295            section = "INTENSITY"
296            record.intensities = numpy.zeros((record.nrows, record.ncols))
297            record.stdevs = numpy.zeros((record.nrows, record.ncols))
298            record.npix = numpy.zeros((record.nrows, record.ncols), int)
299        elif line.startswith("[MASKS]"):
300            section = "MASKS"
301            record.mask = numpy.zeros((record.nrows, record.ncols), bool)
302        elif line.startswith("[OUTLIERS]"):
303            section = "OUTLIERS"
304            record.outliers = numpy.zeros((record.nrows, record.ncols), bool)
305        elif line.startswith("[MODIFIED]"):
306            section = "MODIFIED"
307            record.modified = numpy.zeros((record.nrows, record.ncols))
308        elif line.startswith("["):
309            raise ParserError("Unknown section found in version 3 CEL file")
310        else:  # read the data in a section
311            if section == "HEADER":
312                # Set record.ncols and record.nrows, remaining data goes into
313                # record.header dict
314                key, value = line.split("=", 1)
315                if key == "Cols":
316                    record.ncols = int(value)
317                elif key == "Rows":
318                    record.nrows = int(value)
319                elif key == "GridCornerUL":
320                    x, y = value.split()
321                    record.GridCornerUL = (int(x), int(y))
322                elif key == "GridCornerUR":
323                    x, y = value.split()
324                    record.GridCornerUR = (int(x), int(y))
325                elif key == "GridCornerLR":
326                    x, y = value.split()
327                    record.GridCornerLR = (int(x), int(y))
328                elif key == "GridCornerLL":
329                    x, y = value.split()
330                    record.GridCornerLL = (int(x), int(y))
331                elif key == "DatHeader":
332                    # not sure if all parameters here are interpreted correctly
333                    record.DatHeader = {}
334                    index = line.find(":")
335                    _, filename = line[:index].split()
336                    record.DatHeader["filename"] = filename
337                    index += 1
338                    field = line[index : index + 9]
339                    assert field[:4] == "CLS="
340                    assert field[8] == " "
341                    record.DatHeader["CLS"] = int(field[4:8])
342                    index += 9
343                    field = line[index : index + 9]
344                    assert field[:4] == "RWS="
345                    assert field[8] == " "
346                    record.DatHeader["RWS"] = int(field[4:8])
347                    index += 9
348                    field = line[index : index + 7]
349                    assert field[:4] == "XIN="
350                    assert field[6] == " "
351                    record.DatHeader["XIN"] = int(field[4:6])
352                    index += 7
353                    field = line[index : index + 7]
354                    assert field[:4] == "YIN="
355                    assert field[6] == " "
356                    record.DatHeader["YIN"] = int(field[4:6])
357                    index += 7
358                    field = line[index : index + 6]
359                    assert field[:3] == "VE="
360                    assert field[5] == " "
361                    record.DatHeader["VE"] = int(field[3:5])
362                    index += 6
363                    field = line[index : index + 7]
364                    assert field[6] == " "
365                    temperature = field[:6].strip()
366                    if temperature:
367                        record.DatHeader["temperature"] = int(temperature)
368                    else:
369                        record.DatHeader["temperature"] = None
370                    index += 7
371                    field = line[index : index + 4]
372                    assert field.endswith(" ")
373                    record.DatHeader["laser-power"] = float(field)
374                    index += 4
375                    field = line[index : index + 18]
376                    assert field[8] == " "
377                    record.DatHeader["scan-date"] = field[:8]
378                    assert field[17] == " "
379                    record.DatHeader["scan-time"] = field[9:17]
380                    index += 18
381                    field = line[index:]
382                    subfields = field.split(" \x14 ")
383                    assert len(subfields) == 12
384                    subfield = subfields[0]
385                    try:
386                        scanner_id, scanner_type = subfield.split()
387                    except ValueError:
388                        scanner_id = subfield.strip()
389                    record.DatHeader["scanner-id"] = scanner_id
390                    record.DatHeader["scanner-type"] = scanner_type
391                    record.DatHeader["array-type"] = subfields[2]
392                    record.DatHeader["image-orientation"] = int(subfields[11])
393                elif key == "Algorithm":
394                    record.Algorithm = value
395                elif key == "AlgorithmParameters":
396                    parameters = value.split(";")
397                    values = {}
398                    for parameter in parameters:
399                        key, value = parameter.split(":", 1)
400                        if key in (
401                            "Percentile",
402                            "CellMargin",
403                            "FullFeatureWidth",
404                            "FullFeatureHeight",
405                            "PoolWidthExtenstion",
406                            "PoolHeightExtension",
407                        ):
408                            values[key] = int(value)
409                        elif key in ("OutlierHigh", "OutlierLow", "StdMult"):
410                            values[key] = float(value)
411                        elif key in (
412                            "FixedCellSize",
413                            "IgnoreOutliersInShiftRows",
414                            "FeatureExtraction",
415                            "UseSubgrids",
416                            "RandomizePixels",
417                        ):
418                            if value == "TRUE":
419                                value = True
420                            elif value == "FALSE":
421                                value = False
422                            else:
423                                raise ValueError("Unexpected boolean value")
424                            values[key] = value
425                        elif key in ("AlgVersion", "ErrorBasis"):
426                            values[key] = value
427                        else:
428                            raise ValueError("Unexpected tag in AlgorithmParameters")
429                    record.AlgorithmParameters = values
430            elif section == "INTENSITY":
431                if line.startswith("NumberCells="):
432                    key, value = line.split("=", 1)
433                    record.NumberCells = int(value)
434                elif line.startswith("CellHeader="):
435                    key, value = line.split("=", 1)
436                    if value.split() != ["X", "Y", "MEAN", "STDV", "NPIXELS"]:
437                        raise ParserError(
438                            "Unexpected CellHeader in INTENSITY "
439                            "section CEL version 3 file"
440                        )
441                else:
442                    words = line.split()
443                    y = int(words[0])
444                    x = int(words[1])
445                    record.intensities[x, y] = float(words[2])
446                    record.stdevs[x, y] = float(words[3])
447                    record.npix[x, y] = int(words[4])
448            elif section == "MASKS":
449                if line.startswith("NumberCells="):
450                    key, value = line.split("=", 1)
451                    record.nmask = int(value)
452                elif line.startswith("CellHeader="):
453                    key, value = line.split("=", 1)
454                    if value.split() != ["X", "Y"]:
455                        raise ParserError(
456                            "Unexpected CellHeader in MASKS "
457                            "section in CEL version 3 file"
458                        )
459                else:
460                    words = line.split()
461                    y = int(words[0])
462                    x = int(words[1])
463                    record.mask[x, y] = True
464            elif section == "OUTLIERS":
465                if line.startswith("NumberCells="):
466                    key, value = line.split("=", 1)
467                    record.noutliers = int(value)
468                elif line.startswith("CellHeader="):
469                    key, value = line.split("=", 1)
470                    if value.split() != ["X", "Y"]:
471                        raise ParserError(
472                            "Unexpected CellHeader in OUTLIERS "
473                            "section in CEL version 3 file"
474                        )
475                else:
476                    words = line.split()
477                    y = int(words[0])
478                    x = int(words[1])
479                    record.outliers[x, y] = True
480            elif section == "MODIFIED":
481                if line.startswith("NumberCells="):
482                    key, value = line.split("=", 1)
483                    record.nmodified = int(value)
484                elif line.startswith("CellHeader="):
485                    key, value = line.split("=", 1)
486                    if value.split() != ["X", "Y", "ORIGMEAN"]:
487                        raise ParserError(
488                            "Unexpected CellHeader in MODIFIED "
489                            "section in CEL version 3 file"
490                        )
491                else:
492                    words = line.split()
493                    y = int(words[0])
494                    x = int(words[1])
495                    record.modified[x, y] = float(words[2])
496    return record
497
498
499if __name__ == "__main__":
500    from Bio._utils import run_doctest
501
502    run_doctest()
503