1# -*- coding: utf-8 -*- 2""" 3SAC module helper functions and data. 4 5""" 6from __future__ import (absolute_import, division, print_function, 7 unicode_literals) 8from future.builtins import * # NOQA 9 10import sys 11import warnings 12 13import numpy as np 14 15from obspy import UTCDateTime 16from obspy.core import Stats 17 18from . import header as HD # noqa 19 20# ------------- DATA ---------------------------------------------------------- 21TWO_DIGIT_YEAR_MSG = ("SAC file with 2-digit year header field encountered. " 22 "This is not supported by the SAC file format standard. " 23 "Prepending '19'.") 24 25 26# ------------- SAC-SPECIFIC EXCEPTIONS --------------------------------------- 27class SacError(Exception): 28 """ 29 Raised if the SAC file is corrupt or if necessary information 30 in the SAC file is missing. 31 """ 32 pass 33 34 35class SacIOError(SacError, IOError): 36 """ 37 Raised if the given SAC file can't be read. 38 """ 39 pass 40 41 42class SacInvalidContentError(SacError): 43 """ 44 Raised if headers and/or data are not valid. 45 """ 46 pass 47 48 49class SacHeaderError(SacError): 50 """ 51 Raised if header has issues. 52 """ 53 pass 54 55 56class SacHeaderTimeError(SacHeaderError, ValueError): 57 """ 58 Raised if header has invalid "nz" times. 59 """ 60 pass 61 62 63# ------------- VALIDITY CHECKS ----------------------------------------------- 64def is_valid_enum_str(hdr, name): 65 # is this a valid string name for this hdr 66 # assume that, if a value isn't in HD.ACCEPTED_VALS, it's not valid 67 if hdr in HD.ACCEPTED_VALS: 68 tf = name in HD.ACCEPTED_VALS[hdr] 69 else: 70 tf = False 71 return tf 72 73 74def is_valid_enum_int(hdr, val, allow_null=True): 75 # is this a valid integer for this hdr. 76 if hdr in HD.ACCEPTED_VALS: 77 accep = [HD.ENUM_VALS[nm] for nm in HD.ACCEPTED_VALS[hdr]] 78 if allow_null: 79 accep += [HD.INULL] 80 tf = val in accep 81 else: 82 tf = False 83 return tf 84 85 86# ------------- GENERAL ------------------------------------------------------- 87def _convert_enum(header, converter, accep): 88 # header : dict, SAC header 89 # converter : dict, {source value: target value} 90 # accep : dict, {header name: acceptable value list} 91 92 # TODO: use functools.partial/wraps? 93 for hdr, val in header.items(): 94 if hdr in HD.ACCEPTED_VALS: 95 if val in accep[hdr]: 96 header[hdr] = converter[val] 97 else: 98 msg = 'Unrecognized enumerated value "{}" for header "{}"' 99 raise ValueError(msg.format(val, hdr)) 100 101 return header 102 103 104def enum_string_to_int(header): 105 """Convert enumerated string values in header dictionary to int values.""" 106 out = _convert_enum(header, converter=HD.ENUM_VALS, accep=HD.ACCEPTED_VALS) 107 return out 108 109 110def enum_int_to_string(header): 111 """Convert enumerated int values in header dictionary to string values.""" 112 out = _convert_enum(header, converter=HD.ENUM_NAMES, accep=HD.ACCEPTED_INT) 113 return out 114 115 116def byteswap(*arrays): 117 """ 118 Swapping of bytes for provided arrays. 119 120 Notes 121 ----- 122 arr.newbyteorder('S') swaps dtype interpretation, but not bytes in memory 123 arr.byteswap() swaps bytes in memory, but not dtype interpretation 124 arr.byteswap(True).newbyteorder('S') completely swaps both 125 126 References 127 ---------- 128 https://docs.scipy.org/doc/numpy/user/basics.byteswapping.html 129 130 """ 131 return [arr.newbyteorder('S') for arr in arrays] 132 133 134def is_same_byteorder(bo1, bo2): 135 """ 136 Deal with all the ways to compare byte order string representations. 137 138 :param bo1: Byte order string. Can be one of {'l', 'little', 'L', '<', 139 'b', 'big', 'B', '>', 'n', 'native','N', '='} 140 :type bo1: str 141 :param bo2: Byte order string. Can be one of {'l', 'little', 'L', '<', 142 'b', 'big', 'B', '>', 'n', 'native','N', '='} 143 :type bo1: str 144 145 :rtype: bool 146 :return: True of same byte order. 147 148 """ 149 # TODO: extend this as is_same_byteorder(*byteorders) using itertools 150 be = ('b', 'big', '>') 151 le = ('l', 'little', '<') 152 ne = ('n', 'native', '=') 153 ok = be + le + ne 154 155 if (bo1.lower() not in ok) or (bo2.lower() not in ok): 156 raise ValueError("Unrecognized byte order string.") 157 158 # make native decide what it is 159 bo1 = sys.byteorder if bo1.lower() in ne else bo1 160 bo2 = sys.byteorder if bo2.lower() in ne else bo2 161 162 return (bo1.lower() in le) == (bo2.lower() in le) 163 164 165def _clean_str(value, strip_whitespace=True): 166 """ 167 Remove null values and whitespace, return a str 168 169 This fn is used in two places: in SACTrace.read, to sanitize strings for 170 SACTrace, and in sac_to_obspy_header, to sanitize strings for making a 171 Trace that the user may have manually added. 172 """ 173 null_term = value.find('\x00') 174 if null_term >= 0: 175 value = value[:null_term] + " " * len(value[null_term:]) 176 177 if strip_whitespace: 178 value = value.strip() 179 180 return value 181 182 183# TODO: do this in SACTrace? 184def sac_to_obspy_header(sacheader): 185 """ 186 Make an ObsPy Stats header dictionary from a SAC header dictionary. 187 188 :param sacheader: SAC header dictionary. 189 :type sacheader: dict 190 191 :rtype: :class:`~obspy.core.Stats` 192 :return: Filled ObsPy Stats header. 193 194 """ 195 196 # 1. get required sac header values 197 try: 198 npts = sacheader['npts'] 199 delta = sacheader['delta'] 200 except KeyError: 201 msg = "Incomplete SAC header information to build an ObsPy header." 202 raise KeyError(msg) 203 204 assert npts != HD.INULL 205 assert delta != HD.FNULL 206 # 207 # 2. get time 208 try: 209 reftime = get_sac_reftime(sacheader) 210 except (SacError, ValueError, TypeError): 211 # ObsPy doesn't require a valid reftime 212 reftime = UTCDateTime(0.0) 213 214 b = sacheader.get('b', HD.FNULL) 215 # 216 # 3. get optional sac header values 217 calib = sacheader.get('scale', HD.FNULL) 218 kcmpnm = sacheader.get('kcmpnm', HD.SNULL) 219 kstnm = sacheader.get('kstnm', HD.SNULL) 220 knetwk = sacheader.get('knetwk', HD.SNULL) 221 khole = sacheader.get('khole', HD.SNULL) 222 # 223 # 4. deal with null values 224 b = b if (b != HD.FNULL) else 0.0 225 calib = calib if (calib != HD.FNULL) else 1.0 226 kcmpnm = kcmpnm if (kcmpnm != HD.SNULL) else '' 227 kstnm = kstnm if (kstnm != HD.SNULL) else '' 228 knetwk = knetwk if (knetwk != HD.SNULL) else '' 229 khole = khole if (khole != HD.SNULL) else '' 230 # 231 # 5. transform to obspy values 232 # nothing is null 233 stats = {} 234 stats['npts'] = npts 235 stats['sampling_rate'] = np.float32(1.) / np.float32(delta) 236 stats['network'] = _clean_str(knetwk) 237 stats['station'] = _clean_str(kstnm) 238 stats['channel'] = _clean_str(kcmpnm) 239 stats['location'] = _clean_str(khole) 240 stats['calib'] = calib 241 242 # store _all_ provided SAC header values 243 stats['sac'] = sacheader.copy() 244 245 # get first sample absolute time as UTCDateTime 246 # always add the begin time (if it's defined) to get the given 247 # SAC reference time, no matter which iztype is given 248 # b may be non-zero, even for iztype 'ib', especially if it was used to 249 # store microseconds from obspy_to_sac_header 250 stats['starttime'] = UTCDateTime(reftime) + b 251 252 return Stats(stats) 253 254 255def split_microseconds(microseconds): 256 # Returns milliseconds and remainder microseconds 257 milliseconds = microseconds // 1000 258 microseconds = (microseconds - milliseconds * 1000) 259 260 return milliseconds, microseconds 261 262 263def utcdatetime_to_sac_nztimes(utcdt): 264 # Returns a dict of integer nz-times and remainder microseconds 265 nztimes = {} 266 nztimes['nzyear'] = utcdt.year 267 nztimes['nzjday'] = utcdt.julday 268 nztimes['nzhour'] = utcdt.hour 269 nztimes['nzmin'] = utcdt.minute 270 nztimes['nzsec'] = utcdt.second 271 # nz times don't have enough precision, so push microseconds into b, 272 # using integer arithmetic 273 millisecond, microsecond = split_microseconds(utcdt.microsecond) 274 nztimes['nzmsec'] = millisecond 275 276 return nztimes, microsecond 277 278 279def obspy_to_sac_header(stats, keep_sac_header=True): 280 """ 281 Merge a primary with a secondary header, reconciling some differences. 282 283 :param stats: Filled ObsPy Stats header 284 :type stats: dict or :class:`~obspy.core.Stats` 285 :param keep_sac_header: If keep_sac_header is True, old stats.sac 286 header values are kept, and a minimal set of values are updated from 287 the stats dictionary according to these guidelines: 288 * npts, delta always come from stats 289 * If a valid old reftime is found, the new b and e will be made 290 and properly referenced to it. All other old SAC headers are simply 291 carried along. 292 * If the old SAC reftime is invalid and relative time headers are set, 293 a SacHeaderError exception will be raised. 294 * If the old SAC reftime is invalid, no relative time headers are set, 295 and "b" is set, "e" is updated from stats and other old SAC headers 296 are carried along. 297 * If the old SAC reftime is invalid, no relative time headers are set, 298 and "b" is not set, the reftime will be set from stats.starttime 299 (with micro/milliseconds precision adjustments) and "b" and "e" are 300 set accordingly. 301 * If 'kstnm', 'knetwk', 'kcmpnm', or 'khole' are not set or differ 302 from Stats values 'station', 'network', 'channel', or 'location', 303 they are taken from the Stats values. 304 If keep_sac_header is False, a new SAC header is constructed from only 305 information found in the Stats dictionary, with some other default 306 values introduced. It will be an iztype 9 ("ib") file, with small 307 reference time adjustments for micro/milliseconds precision issues. 308 SAC headers nvhdr, level, lovrok, and iftype are always produced. 309 :type keep_sac_header: bool 310 :rtype merged: dict 311 :return: SAC header 312 313 """ 314 header = {} 315 oldsac = stats.get('sac', {}) 316 317 if keep_sac_header and oldsac: 318 header.update(oldsac) 319 320 try: 321 reftime = get_sac_reftime(header) 322 except SacHeaderTimeError: 323 reftime = None 324 325 relhdrs = [hdr for hdr in HD.RELHDRS 326 if header.get(hdr) not in (None, HD.FNULL)] 327 328 if reftime: 329 # Set current 'b' relative to the old reftime. 330 b = stats['starttime'] - reftime 331 else: 332 # Invalid reference time. Relative times like 'b' cannot be 333 # unambiguously referenced to stats.starttime. 334 if 'b' in relhdrs: 335 # Assume no trimming/expanding of the Trace occurred relative 336 # to the old 'b', and just use the old 'b' value. 337 b = header['b'] 338 else: 339 # Assume it's an iztype=ib (9) type file. Also set iztype? 340 b = 0 341 342 # Set the stats.starttime as the reftime and set 'b' and 'e'. 343 # ObsPy issue 1204 344 reftime = stats['starttime'] - b 345 nztimes, microsecond = utcdatetime_to_sac_nztimes(reftime) 346 header.update(nztimes) 347 b += (microsecond * 1e-6) 348 349 header['b'] = b 350 header['e'] = b + (stats['endtime'] - stats['starttime']) 351 352 # Merge some values from stats if they're missing in the SAC header 353 # ObsPy issues 1204, 1457 354 # XXX: If Stats values are empty/"" and SAC header values are real, 355 # this will replace the real SAC values with SAC null values. 356 for sachdr, statshdr in [('kstnm', 'station'), ('knetwk', 'network'), 357 ('kcmpnm', 'channel'), ('khole', 'location')]: 358 if (header.get(sachdr) in (None, HD.SNULL)) or \ 359 (header.get(sachdr).strip() != stats[statshdr]): 360 header[sachdr] = stats[statshdr] or HD.SNULL 361 else: 362 # SAC header from Stats only. 363 364 # Here, set headers from Stats that would otherwise depend on the old 365 # SAC header 366 header['iztype'] = 9 367 starttime = stats['starttime'] 368 # nz times don't have enough precision, so push microseconds into b, 369 # using integer arithmetic 370 nztimes, microsecond = utcdatetime_to_sac_nztimes(starttime) 371 header.update(nztimes) 372 373 header['b'] = microsecond * 1e-6 374 375 # we now have correct b, npts, delta, and nz times 376 header['e'] = header['b'] + (stats['npts'] - 1) * stats['delta'] 377 378 header['scale'] = stats.get('calib', HD.FNULL) 379 380 # NOTE: overwrites existing SAC headers 381 # nulls for these are '', which stats.get(hdr, HD.SNULL) won't catch 382 header['kcmpnm'] = stats['channel'] if stats['channel'] else HD.SNULL 383 header['kstnm'] = stats['station'] if stats['station'] else HD.SNULL 384 header['knetwk'] = stats['network'] if stats['network'] else HD.SNULL 385 header['khole'] = stats['location'] if stats['location'] else HD.SNULL 386 387 header['lpspol'] = True 388 header['lcalda'] = False 389 390 # ObsPy issue 1204 391 header['nvhdr'] = 6 392 header['leven'] = 1 393 header['lovrok'] = 1 394 header['iftype'] = 1 395 396 # ObsPy issue #1317 397 header['npts'] = stats['npts'] 398 header['delta'] = stats['delta'] 399 400 return header 401 402 403def get_sac_reftime(header): 404 """ 405 Get SAC header reference time as a UTCDateTime instance from a SAC header 406 dictionary. 407 408 Builds the reference time from SAC "nz" time fields. Raises 409 :class:`SacHeaderTimeError` if any time fields are null. 410 411 :param header: SAC header 412 :type header: dict 413 414 :rtype: :class:`~obspy.core.UTCDateTime` 415 :returns: SAC reference time. 416 417 """ 418 # NOTE: epoch seconds can be got by: 419 # (reftime - datetime.datetime(1970,1,1)).total_seconds() 420 try: 421 yr = header['nzyear'] 422 nzjday = header['nzjday'] 423 nzhour = header['nzhour'] 424 nzmin = header['nzmin'] 425 nzsec = header['nzsec'] 426 nzmsec = header['nzmsec'] 427 except KeyError as e: 428 # header doesn't have all the keys 429 msg = "Not enough time information: {}".format(e) 430 raise SacHeaderTimeError(msg) 431 432 if 0 <= yr <= 99: 433 warnings.warn(TWO_DIGIT_YEAR_MSG) 434 yr += 1900 435 436 try: 437 reftime = UTCDateTime(year=yr, julday=nzjday, hour=nzhour, 438 minute=nzmin, second=nzsec, 439 microsecond=nzmsec * 1000) 440 except (ValueError, TypeError): 441 msg = "Invalid time headers. May contain null values." 442 raise SacHeaderTimeError(msg) 443 444 return reftime 445