1# -*- coding: utf-8 -*- 2""" 3Routines for handling of Reftek130 packets. 4 5Currently only event header (EH), event trailer (ET) and data (DT) packets are 6handled. These three packets have more or less the same meaning in the first 8 7bytes of the payload which makes the first 24 bytes the so called extended 8header. 9""" 10from __future__ import (absolute_import, division, print_function, 11 unicode_literals) 12from future.builtins import * # NOQA 13from future.utils import native_str 14 15import warnings 16 17import numpy as np 18 19from obspy import UTCDateTime 20from obspy.core.compatibility import from_buffer 21from obspy.io.mseed.headers import clibmseed 22 23from .util import ( 24 _decode_ascii, _parse_long_time, _16_tuple_ascii, _16_tuple_int, 25 _16_tuple_float, bcd, bcd_hex, 26 bcd_julian_day_string_to_nanoseconds_of_year, bcd_16bit_int, bcd_8bit_hex, 27 _get_nanoseconds_for_start_of_year) 28 29 30class Reftek130UnpackPacketError(ValueError): 31 pass 32 33 34PACKET_TYPES_IMPLEMENTED = ("EH", "ET", "DT") 35PACKET_TYPES_NOT_IMPLEMENTED = ("AD", "CD", "DS", "FD", "OM", "SC", "SH") 36PACKET_TYPES = PACKET_TYPES_IMPLEMENTED + PACKET_TYPES_NOT_IMPLEMENTED 37 38 39# The extended header which is the same for EH/ET/DT packets. 40# tuples are: 41# - field name 42# - dtype during initial reading 43# - conversion routine (if any) 44# - dtype after conversion 45PACKET = [ 46 ("packet_type", native_str("|S2"), None, native_str("S2")), 47 ("experiment_number", np.uint8, bcd, np.uint8), 48 ("year", np.uint8, bcd, np.uint8), 49 ("unit_id", (np.uint8, 2), bcd_hex, native_str("S4")), 50 ("time", (np.uint8, 6), bcd_julian_day_string_to_nanoseconds_of_year, 51 np.int64), 52 ("byte_count", (np.uint8, 2), bcd_16bit_int, np.uint16), 53 ("packet_sequence", (np.uint8, 2), bcd_16bit_int, np.uint16), 54 ("event_number", (np.uint8, 2), bcd_16bit_int, np.uint16), 55 ("data_stream_number", np.uint8, bcd, np.uint8), 56 ("channel_number", np.uint8, bcd, np.uint8), 57 ("number_of_samples", (np.uint8, 2), bcd_16bit_int, np.uint32), 58 ("flags", np.uint8, None, np.uint8), 59 ("data_format", np.uint8, bcd_8bit_hex, native_str("S2")), 60 # Temporarily store the payload here. 61 ("payload", (np.uint8, 1000), None, (np.uint8, 1000))] 62 63 64# name, offset, length (bytes) and converter routine for EH/ET packet payload 65EH_PAYLOAD = { 66 "trigger_time_message": (0, 33, _decode_ascii), 67 "time_source": (33, 1, _decode_ascii), 68 "time_quality": (34, 1, _decode_ascii), 69 "station_name_extension": (35, 1, _decode_ascii), 70 "station_name": (36, 4, _decode_ascii), 71 "stream_name": (40, 16, _decode_ascii), 72 "_reserved_2": (56, 8, _decode_ascii), 73 "sampling_rate": (64, 4, int), 74 "trigger_type": (68, 4, _decode_ascii), 75 "trigger_time": (72, 16, _parse_long_time), 76 "first_sample_time": (88, 16, _parse_long_time), 77 "detrigger_time": (104, 16, _parse_long_time), 78 "last_sample_time": (120, 16, _parse_long_time), 79 "channel_adjusted_nominal_bit_weights": (136, 128, _16_tuple_ascii), 80 "channel_true_bit_weights": (264, 128, _16_tuple_ascii), 81 "channel_gain_code": (392, 16, _16_tuple_ascii), 82 "channel_ad_resolution_code": (408, 16, _16_tuple_ascii), 83 "channel_fsa_code": (424, 16, _16_tuple_ascii), 84 "channel_code": (440, 64, _16_tuple_ascii), 85 "channel_sensor_fsa_code": (504, 16, _16_tuple_ascii), 86 "channel_sensor_vpu": (520, 96, _16_tuple_float), 87 "channel_sensor_units_code": (616, 16, _16_tuple_ascii), 88 "station_channel_number": (632, 48, _16_tuple_int), 89 "_reserved_3": (680, 156, _decode_ascii), 90 "total_installed_channels": (836, 2, int), 91 "station_comment": (838, 40, _decode_ascii), 92 "digital_filter_list": (878, 16, _decode_ascii), 93 "position": (894, 26, _decode_ascii), 94 "reftek_120": (920, 80, None)} 95 96 97class Packet(object): 98 _headers = ('experiment_number', 'unit_id', 'byte_count', 99 'packet_sequence', 'time') 100 101 @staticmethod 102 def from_data(data): 103 packet_type = data['packet_type'].decode("ASCII", "ignore") 104 if packet_type in ("EH", "ET"): 105 return EHPacket(data) 106 elif packet_type == "DT": 107 return DTPacket(data) 108 else: 109 msg = "Can not create Reftek packet for packet type '{}'" 110 raise NotImplementedError(msg.format(packet_type)) 111 112 def __init__(self, data): 113 raise NotImplementedError() 114 115 @property 116 def type(self): 117 return self._data['packet_type'].item() 118 119 def __getattr__(self, name): 120 if name in self._headers: 121 return self._data[name].item() 122 123 @property 124 def nanoseconds(self): 125 return self._data['time'].item() 126 127 @property 128 def time(self): 129 return UTCDateTime(ns=self._data['time'].item()) 130 131 132class EHPacket(Packet): 133 __slots__ = ["_data"] + list(EH_PAYLOAD.keys()) 134 _headers = ('packet_sequence', 'experiment_number', 'unit_id', 135 'byte_count', 'time', 'event_number', 'data_stream_number', 136 'data_format', 'flags') 137 138 def __init__(self, data): 139 self._data = data 140 try: 141 payload = self._data["payload"].tobytes() 142 except AttributeError: 143 # for numpy < 1.9.0, does not work for python 3.6 144 payload = bytes(self._data["payload"]) 145 for name, (start, length, converter) in EH_PAYLOAD.items(): 146 data = payload[start:start + length] 147 if converter is not None: 148 data = converter(data) 149 setattr(self, name, data) 150 151 def _to_dict(self): 152 """ 153 Convert to dictionary structure. 154 """ 155 return {key: getattr(self, key) for key in EH_PAYLOAD.keys()} 156 157 def __str__(self, compact=False): 158 if compact: 159 sta = (self.station_name.strip() + 160 self.station_name_extension.strip()) 161 info = ("{:04d} {:2s} {:4s} {:2d} {:4d} {:4d} {:2d} {:2s} " 162 "{:5s} {:4d} {!s}").format( 163 self.packet_sequence, self.type.decode(), 164 self.unit_id.decode(), self.experiment_number, 165 self.byte_count, self.event_number, 166 self.data_stream_number, self.data_format.decode(), 167 sta, self.sampling_rate, self.time) 168 else: 169 info = [] 170 for key in self._headers: 171 value = getattr(self, key) 172 if key in ("unit_id", "data_format"): 173 value = value.decode() 174 info.append("{}: {}".format(key, value)) 175 info.append("-" * 20) 176 for key in sorted(EH_PAYLOAD.keys()): 177 value = getattr(self, key) 178 if key in ("trigger_time", "detrigger_time", 179 "first_sample_time", "last_sample_time"): 180 if value is not None: 181 value = UTCDateTime(ns=value) 182 info.append("{}: {}".format(key, value)) 183 info = "{} Packet\n\t{}".format(self.type.decode(), 184 "\n\t".join(info)) 185 return info 186 187 188class DTPacket(Packet): 189 __slots__ = ["_data"] 190 _headers = ('packet_sequence', 'experiment_number', 'unit_id', 191 'byte_count', 'time', 'event_number', 'data_stream_number', 192 'channel_number', 'number_of_samples', 'data_format', 'flags') 193 194 def __init__(self, data): 195 self._data = data 196 197 def __str__(self, compact=False): 198 if compact: 199 info = ("{:04d} {:2s} {:4s} {:2d} {:4d} {:4d} {:2d} {:2s} " 200 " {:2d} {:4d} {!s}").format( 201 self.packet_sequence, self.type.decode(), 202 self.unit_id.decode(), self.experiment_number, 203 self.byte_count, self.event_number, 204 self.data_stream_number, self.data_format.decode(), 205 self.channel_number, self.number_of_samples, self.time) 206 else: 207 info = [] 208 for key in self._headers: 209 value = getattr(self, key) 210 if key in ("unit_id", "data_format"): 211 value = value.decode() 212 info.append("{}: {}".format(key, value)) 213 info = "{} Packet\n\t{}".format(self.type.decode(), 214 "\n\t".join(info)) 215 return info 216 217 218PACKET_INITIAL_UNPACK_DTYPE = np.dtype([ 219 (native_str(name), dtype_initial) 220 for name, dtype_initial, converter, dtype_final in PACKET]) 221 222PACKET_FINAL_DTYPE = np.dtype([ 223 (native_str(name), dtype_final) 224 for name, dtype_initial, converter, dtype_final in PACKET]) 225 226 227def _initial_unpack_packets(bytestring): 228 """ 229 First unpack data with dtype matching itemsize of storage in the reftek 230 file, than allocate result array with dtypes for storage of python 231 objects/arrays and fill it with the unpacked data. 232 """ 233 if not len(bytestring): 234 return np.array([], dtype=PACKET_FINAL_DTYPE) 235 236 if len(bytestring) % 1024 != 0: 237 tail = len(bytestring) % 1024 238 bytestring = bytestring[:-tail] 239 msg = ("Length of data not a multiple of 1024. Data might be " 240 "truncated. Dropping {:d} byte(s) at the end.").format(tail) 241 warnings.warn(msg) 242 data = from_buffer( 243 bytestring, dtype=PACKET_INITIAL_UNPACK_DTYPE) 244 result = np.empty_like(data, dtype=PACKET_FINAL_DTYPE) 245 246 for name, dtype_initial, converter, dtype_final in PACKET: 247 if converter is None: 248 result[name][:] = data[name][:] 249 else: 250 try: 251 result[name][:] = converter(data[name]) 252 except Exception as e: 253 raise Reftek130UnpackPacketError(str(e)) 254 # time unpacking is special and needs some additional work. 255 # we need to add the POSIX timestamp of the start of respective year to the 256 # already unpacked seconds into the respective year.. 257 result['time'][:] += [_get_nanoseconds_for_start_of_year(y) 258 for y in result['year']] 259 return result 260 261 262def _unpack_C0_C2_data(packets, encoding): # noqa 263 """ 264 Unpacks sample data from a packet array that uses 'C0' or 'C2' data 265 encoding. 266 267 :type packets: :class:`numpy.ndarray` (dtype ``PACKET_FINAL_DTYPE``) 268 :param packets: Array of data packets (``packet_type`` ``'DT'``) from which 269 to unpack the sample data (with data encoding 'C0' or 'C2'). 270 :type encoding: str 271 :param encoding: Reftek data encoding as specified in event header (EH) 272 packet, either ``'C0'`` or ``'C2'``. 273 """ 274 if encoding == 'C0': 275 encoding_bytes = b'C0' 276 elif encoding == 'C2': 277 encoding_bytes = b'C2' 278 else: 279 msg = "Unregonized encoding: '{}'".format(encoding) 280 raise ValueError(msg) 281 if np.any(packets['data_format'] != encoding_bytes): 282 differing_formats = np.unique( 283 packets[packets['data_format'] != 284 encoding_bytes]['data_format']).tolist() 285 msg = ("Using '{}' data format unpacking routine but some packet(s) " 286 "specify other data format(s): {}".format(encoding, 287 differing_formats)) 288 warnings.warn(msg) 289 # if the packet array is contiguous in memory (which it generally should 290 # be), we can work with the memory address of the first packed data byte 291 # and advance it by a fixed offset when moving from one packet to the next 292 if packets.flags['C_CONTIGUOUS'] and packets.flags['F_CONTIGUOUS']: 293 return _unpack_C0_C2_data_fast(packets, encoding) 294 # if the packet array is *not* contiguous in memory, fall back to slightly 295 # slower unpacking with looking up the memory position of the first packed 296 # byte in each packet individually. 297 else: 298 return _unpack_C0_C2_data_safe(packets, encoding) 299 300 301def _unpack_C0_C2_data_fast(packets, encoding): # noqa 302 """ 303 Unpacks sample data from a packet array that uses 'C0' or 'C2' data 304 encoding. 305 306 Unfortunately the whole data cannot be unpacked with one call to 307 libmseed as some payloads do not take the full 960 bytes. They are 308 thus padded which would results in padded pieces directly in a large 309 array and libmseed (understandably) does not support that. 310 311 Thus we resort to *tada* pointer arithmetics in Python ;-) This is 312 quite a bit faster then correctly casting to an integer pointer so 313 it's worth it. 314 315 Also avoid a data copy. 316 317 Writing this directly in C would be about 3 times as fast so it might 318 be worth it. 319 320 :type packets: :class:`numpy.ndarray` (dtype ``PACKET_FINAL_DTYPE``) 321 :param packets: Array of data packets (``packet_type`` ``'DT'``) from which 322 to unpack the sample data (with data encoding 'C0' or 'C2'). 323 :type encoding: str 324 :param encoding: Reftek data encoding as specified in event header (EH) 325 packet, either ``'C0'`` or ``'C2'``. 326 """ 327 if encoding == 'C0': 328 decode_steim = clibmseed.msr_decode_steim1 329 elif encoding == 'C2': 330 decode_steim = clibmseed.msr_decode_steim2 331 else: 332 msg = "Unregonized encoding: '{}'".format(encoding) 333 raise ValueError(msg) 334 npts = packets["number_of_samples"].sum() 335 unpacked_data = np.empty(npts, dtype=np.int32) 336 pos = 0 337 s = packets[0]["payload"][40:].ctypes.data 338 if len(packets) > 1: 339 offset = ( 340 packets[1]["payload"][40:].ctypes.data - s) 341 else: 342 offset = 0 343 for _npts in packets["number_of_samples"]: 344 decode_steim( 345 s, 960, _npts, unpacked_data[pos:], _npts, None, 346 1) 347 pos += _npts 348 s += offset 349 return unpacked_data 350 351 352def _unpack_C0_C2_data_safe(packets, encoding): # noqa 353 """ 354 Unpacks sample data from a packet array that uses 'C0' or 'C2' data 355 encoding. 356 357 If the packet array is *not* contiguous in memory, fall back to slightly 358 slower unpacking with looking up the memory position of the first packed 359 byte in each packet individually. 360 361 :type packets: :class:`numpy.ndarray` (dtype ``PACKET_FINAL_DTYPE``) 362 :param packets: Array of data packets (``packet_type`` ``'DT'``) from which 363 to unpack the sample data (with data encoding 'C0' or 'C2'). 364 :type encoding: str 365 :param encoding: Reftek data encoding as specified in event header (EH) 366 packet, either ``'C0'`` or ``'C2'``. 367 """ 368 if encoding == 'C0': 369 decode_steim = clibmseed.msr_decode_steim1 370 elif encoding == 'C2': 371 decode_steim = clibmseed.msr_decode_steim2 372 else: 373 msg = "Unregonized encoding: '{}'".format(encoding) 374 raise ValueError(msg) 375 npts = packets["number_of_samples"].sum() 376 unpacked_data = np.empty(npts, dtype=np.int32) 377 pos = 0 378 # Sample data starts at byte 40 in the DT packet's 379 # payload. 380 for p in packets: 381 _npts = p["number_of_samples"] 382 decode_steim( 383 p["payload"][40:].ctypes.data, 960, _npts, 384 unpacked_data[pos:], _npts, None, 1) 385 pos += _npts 386 return unpacked_data 387 388 389if __name__ == '__main__': 390 import doctest 391 doctest.testmod(exclude_empty=True) 392