1# -*- coding: utf-8 -*- 2""" 3WIN/DATAMARK format bindings to ObsPy. 4""" 5from __future__ import (absolute_import, division, print_function, 6 unicode_literals) 7from future.builtins import * # NOQA 8from future.utils import native_str 9 10import warnings 11 12import numpy as np 13 14from obspy import Stream, Trace, UTCDateTime 15from obspy.core.compatibility import from_buffer 16 17 18def _is_win(filename, century="20"): # @UnusedVariable 19 """ 20 Checks whether a file is WIN or not. 21 22 :type filename: str 23 :param filename: WIN file to be checked. 24 :rtype: bool 25 :return: ``True`` if a WIN file. 26 """ 27 # as long we don't have full format description we just try to read the 28 # file like _read_win and check for errors 29 century = "20" # hardcoded ;( 30 try: 31 with open(filename, "rb") as fpin: 32 fpin.read(4) 33 buff = fpin.read(6) 34 yy = "%s%02x" % (century, ord(buff[0:1])) 35 mm = "%x" % ord(buff[1:2]) 36 dd = "%x" % ord(buff[2:3]) 37 hh = "%x" % ord(buff[3:4]) 38 mi = "%x" % ord(buff[4:5]) 39 sec = "%x" % ord(buff[5:6]) 40 41 # This will raise for invalid dates. 42 UTCDateTime(int(yy), int(mm), int(dd), int(hh), int(mi), 43 int(sec)) 44 buff = fpin.read(4) 45 '%02x' % ord(buff[0:1]) 46 '%02x' % ord(buff[1:2]) 47 int('%x' % (ord(buff[2:3]) >> 4)) 48 ord(buff[3:4]) 49 idata00 = fpin.read(4) 50 from_buffer(idata00, native_str('>i'))[0] 51 except Exception: 52 return False 53 return True 54 55 56def _read_win(filename, century="20", **kwargs): # @UnusedVariable 57 """ 58 Reads a WIN file and returns a Stream object. 59 60 .. warning:: 61 This function should NOT be called directly, it registers via the 62 ObsPy :func:`~obspy.core.stream.read` function, call this instead. 63 64 :type filename: str 65 :param filename: WIN file to be read. 66 :param century: WIN stores year as 2 numbers, need century to 67 construct proper datetime. 68 :rtype: :class:`~obspy.core.stream.Stream` 69 :returns: Stream object containing header and data. 70 """ 71 output = {} 72 srates = {} 73 74 # read win file 75 with open(filename, "rb") as fpin: 76 fpin.seek(0, 2) 77 sz = fpin.tell() 78 fpin.seek(0) 79 leng = 0 80 status0 = 0 81 start = 0 82 while leng < sz: 83 pklen = fpin.read(4) 84 if len(pklen) < 4: 85 break 86 leng = 4 87 truelen = from_buffer(pklen, native_str('>i'))[0] 88 if truelen == 0: 89 break 90 buff = fpin.read(6) 91 leng += 6 92 93 yy = "%s%02x" % (century, ord(buff[0:1])) 94 mm = "%x" % ord(buff[1:2]) 95 dd = "%x" % ord(buff[2:3]) 96 hh = "%x" % ord(buff[3:4]) 97 mi = "%x" % ord(buff[4:5]) 98 sec = "%x" % ord(buff[5:6]) 99 100 date = UTCDateTime(int(yy), int(mm), int(dd), int(hh), int(mi), 101 int(sec)) 102 if start == 0: 103 start = date 104 if status0 == 0: 105 sdata = None 106 while leng < truelen: 107 buff = fpin.read(4) 108 leng += 4 109 flag = '%02x' % ord(buff[0:1]) 110 chanum = '%02x' % ord(buff[1:2]) 111 chanum = "%02s%02s" % (flag, chanum) 112 datawide = int('%x' % (ord(buff[2:3]) >> 4)) 113 srate = ord(buff[3:4]) 114 xlen = (srate - 1) * datawide 115 if datawide == 0: 116 xlen = srate // 2 117 datawide = 0.5 118 119 idata00 = fpin.read(4) 120 leng += 4 121 idata22 = from_buffer(idata00, native_str('>i'))[0] 122 123 if chanum in output: 124 output[chanum].append(idata22) 125 else: 126 output[chanum] = [idata22, ] 127 srates[chanum] = srate 128 sdata = fpin.read(xlen) 129 leng += xlen 130 131 if len(sdata) < xlen: 132 fpin.seek(-(xlen - len(sdata)), 1) 133 sdata += fpin.read(xlen - len(sdata)) 134 msg = "This shouldn't happen, it's weird..." 135 warnings.warn(msg) 136 137 if datawide == 0.5: 138 for i in range(xlen): 139 idata2 = output[chanum][-1] + \ 140 from_buffer(sdata[i:i + 1], np.int8)[0] >> 4 141 output[chanum].append(idata2) 142 idata2 = idata2 +\ 143 (from_buffer(sdata[i:i + 1], 144 np.int8)[0] << 4) >> 4 145 output[chanum].append(idata2) 146 elif datawide == 1: 147 for i in range((xlen // datawide)): 148 idata2 = output[chanum][-1] +\ 149 from_buffer(sdata[i:i + 1], np.int8)[0] 150 output[chanum].append(idata2) 151 elif datawide == 2: 152 for i in range((xlen // datawide)): 153 idata2 = output[chanum][-1] +\ 154 from_buffer(sdata[2 * i:2 * (i + 1)], 155 native_str('>h'))[0] 156 output[chanum].append(idata2) 157 elif datawide == 3: 158 for i in range((xlen // datawide)): 159 idata2 = output[chanum][-1] +\ 160 from_buffer(sdata[3 * i:3 * (i + 1)] + b' ', 161 native_str('>i'))[0] >> 8 162 output[chanum].append(idata2) 163 elif datawide == 4: 164 for i in range((xlen // datawide)): 165 idata2 = output[chanum][-1] +\ 166 from_buffer(sdata[4 * i:4 * (i + 1)], 167 native_str('>i'))[0] 168 output[chanum].append(idata2) 169 else: 170 msg = "DATAWIDE is %s " % datawide + \ 171 "but only values of 0.5, 1, 2, 3 or 4 are supported." 172 raise NotImplementedError(msg) 173 174 traces = [] 175 for i in output.keys(): 176 t = Trace(data=np.array(output[i])) 177 t.stats.channel = str(i) 178 t.stats.sampling_rate = float(srates[i]) 179 t.stats.starttime = start 180 traces.append(t) 181 return Stream(traces=traces) 182