1# -*- coding: utf-8 -*- 2""" 3Data extraction and transfer from miniSEED files 4""" 5from __future__ import (absolute_import, division, print_function, 6 unicode_literals) 7from future.builtins import * # NOQA 8from future.utils import with_metaclass 9 10import abc 11import bisect 12import ctypes 13import logging 14import os 15import re 16from collections import namedtuple 17from io import BytesIO 18 19from obspy import read, Stream, UTCDateTime 20from obspy.clients.filesystem.msriterator import _MSRIterator 21 22 23logger = logging.getLogger('obspy.clients.filesystem.miniseed') 24 25 26class ObsPyClientFileSystemMiniSEEDException(Exception): 27 pass 28 29 30class NoDataError(ObsPyClientFileSystemMiniSEEDException): 31 """ 32 Error raised when no data is found 33 """ 34 pass 35 36 37class RequestLimitExceededError(ObsPyClientFileSystemMiniSEEDException): 38 """ 39 Error raised when the amount of data exceeds the configured limit 40 """ 41 pass 42 43 44class _ExtractedDataSegment(with_metaclass(abc.ABCMeta)): 45 """ 46 There are a few different forms that a chunk of extracted data can take, 47 so we return a wrapped object that exposes a simple, consistent API 48 for the handler to use. 49 """ 50 @abc.abstractmethod 51 def get_num_bytes(self): # pragma: no cover 52 """ 53 Return the number of bytes in the segment 54 """ 55 raise NotImplementedError() 56 57 @abc.abstractmethod 58 def get_src_name(self): # pragma: no coveg 59 """ 60 Return the name of the data source 61 """ 62 raise NotImplementedError() 63 64 65class _MSRIDataSegment(_ExtractedDataSegment): 66 """ 67 Segment of data from a _MSRIterator 68 """ 69 def __init__(self, msri, sample_rate, start_time, end_time, src_name, 70 loglevel="WARNING"): 71 """ 72 :param msri: A `_MSRIterator` 73 :param sample_rate: Sample rate of the data 74 :param start_time: A `UTCDateTime` giving the start of the 75 requested data 76 :param end_time: A `UTCDateTime` giving the end of the requested data 77 :param src_name: Name of the data source for logging 78 :type loglevel: str 79 :param loglevel: logging verbosity 80 """ 81 numeric_level = getattr(logging, loglevel.upper(), None) 82 if not isinstance(numeric_level, int): 83 raise ValueError('Invalid log level: %s' % loglevel) 84 logging.basicConfig(level=numeric_level) 85 logger.setLevel(numeric_level) 86 87 self.msri = msri 88 self.sample_rate = sample_rate 89 self.start_time = start_time 90 self.end_time = end_time 91 self.src_name = src_name 92 93 def read_stream(self): 94 msrstart = self.msri.get_startepoch() 95 msrend = self.msri.get_endepoch() 96 reclen = self.msri.msr.contents.reclen 97 98 sepoch = self.start_time.timestamp 99 eepoch = self.end_time.timestamp 100 101 st = Stream() 102 # Process records that intersect with request time window 103 if msrstart < eepoch and msrend > sepoch: 104 105 # Trim record if coverage and partial overlap with request 106 if self.sample_rate > 0 and (msrstart < self.start_time or 107 msrend > self.end_time): 108 logger.debug("Trimming record %s @ %s" % 109 (self.src_name, self.msri.get_starttime())) 110 tr = \ 111 read(BytesIO(ctypes.string_at( 112 self.msri.msr.contents.record, 113 reclen)), 114 format="MSEED")[0] 115 tr.trim(self.start_time, self.end_time) 116 st.traces.append(tr) 117 return st 118 # Otherwise, write un-trimmed record 119 else: 120 # Construct to avoid copying the data, supposedly 121 logger.debug("Writing full record %s @ %s" % 122 (self.src_name, self.msri.get_starttime())) 123 out = (ctypes.c_char * reclen).from_address( 124 ctypes.addressof(self.msri.msr.contents.record.contents)) 125 data = BytesIO(out.raw) 126 st = read(data, format="MSEED") 127 return st 128 129 def get_num_bytes(self): 130 return self.msri.msr.contents.reclen 131 132 def get_src_name(self): 133 return self.src_name 134 135 136class _FileDataSegment(_ExtractedDataSegment): 137 """ 138 Segment of data that comes directly from a data file 139 """ 140 def __init__(self, filename, start_byte, num_bytes, src_name): 141 """ 142 :param filename: Name of data file 143 :param start_byte: Return data starting from this offset 144 :param num_bytes: Length of data to return 145 :param src_name: Name of the data source for logging 146 """ 147 self.filename = filename 148 self.start_byte = start_byte 149 self.num_bytes = num_bytes 150 self.src_name = src_name 151 152 def read_stream(self): 153 st = Stream() 154 with open(self.filename, "rb") as f: 155 f.seek(self.start_byte) 156 raw_data = BytesIO(f.read(self.num_bytes)) 157 st = read(raw_data, format="MSEED") 158 return st 159 160 def get_num_bytes(self): 161 return self.num_bytes 162 163 def get_src_name(self): 164 return self.src_name 165 166 167class _MiniseedDataExtractor(object): 168 """ 169 Component for extracting, trimming, and validating data. 170 """ 171 def __init__(self, dp_replace=None, request_limit=0, loglevel="WARNING"): 172 """ 173 :param dp_replace: optional tuple of (regex, replacement) indicating 174 the location of data files. If regex is omitted, then the replacement 175 string is appended to the beginning of the file name. 176 :param request_limit: optional limit (in bytes) on how much data can 177 be extracted at once 178 :type loglevel: str 179 :param loglevel: logging verbosity 180 """ 181 self.loglevel = loglevel 182 numeric_level = getattr(logging, loglevel.upper(), None) 183 if not isinstance(numeric_level, int): 184 raise ValueError('Invalid log level: %s' % loglevel) 185 logging.basicConfig(level=numeric_level) 186 logger.setLevel(numeric_level) 187 188 if dp_replace: 189 self.dp_replace_re = re.compile(dp_replace[0]) 190 self.dp_replace_sub = dp_replace[1] 191 else: 192 self.dp_replace_re = None 193 self.dp_replace_sub = None 194 self.request_limit = request_limit 195 196 def handle_trimming(self, stime, etime, nrow): 197 """ 198 Get the time & byte-offsets for the data in time range (stime, etime). 199 200 This is done by finding the smallest section of the data in row that 201 falls within the desired time range and is identified by the timeindex 202 field of row. 203 204 :returns: [(start time, start offset, trim_boolean), 205 (end time, end offset, trim_boolean)] 206 """ 207 etime = UTCDateTime(nrow.requestend) 208 row_stime = UTCDateTime(nrow.starttime) 209 row_etime = UTCDateTime(nrow.endtime) 210 211 # If we need a subset of the this block, trim it accordingly 212 block_start = int(nrow.byteoffset) 213 block_end = block_start + int(nrow.bytes) 214 if stime > row_stime or etime < row_etime: 215 tix = [x.split("=>") for x in nrow.timeindex.split(",")] 216 if tix[-1][0] == 'latest': 217 tix[-1] = [str(row_etime.timestamp), block_end] 218 to_x = [float(x[0]) for x in tix] 219 s_index = bisect.bisect_right(to_x, stime.timestamp) - 1 220 if s_index < 0: 221 s_index = 0 222 e_index = bisect.bisect_right(to_x, etime.timestamp) 223 off_start = int(tix[s_index][1]) 224 if e_index >= len(tix): 225 e_index = -1 226 off_end = int(tix[e_index][1]) 227 return ([to_x[s_index], off_start, stime > row_stime], 228 [to_x[e_index], off_end, etime < row_etime],) 229 else: 230 return ([row_stime.timestamp, block_start, False], 231 [row_etime.timestamp, block_end, False]) 232 233 def extract_data(self, index_rows): 234 """ 235 Perform the data extraction. 236 237 :param index_rows: requested data, as produced by 238 `HTTPServer_RequestHandler.fetch_index_rows` 239 :yields: sequence of `_ExtractedDataSegment`s 240 """ 241 242 # Pre-scan the index rows: 243 # 1) Build processed list for extraction 244 # 2) Check if the request is small enough to satisfy 245 # Note: accumulated estimate of output bytes will be equal to or 246 # higher than actual output 247 total_bytes = 0 248 request_rows = [] 249 Request = namedtuple('Request', ['srcname', 'filename', 'starttime', 250 'endtime', 'triminfo', 'bytes', 251 'samplerate']) 252 try: 253 for nrow in index_rows: 254 srcname = "_".join(nrow[:4]) 255 filename = nrow.filename 256 logger.debug("EXTRACT: src=%s, file=%s, bytes=%s, rate:%s" % 257 (srcname, filename, nrow.bytes, nrow.samplerate)) 258 259 starttime = UTCDateTime(nrow.requeststart) 260 endtime = UTCDateTime(nrow.requestend) 261 triminfo = self.handle_trimming(starttime, endtime, nrow) 262 total_bytes += triminfo[1][1] - triminfo[0][1] 263 if self.request_limit > 0 and total_bytes > self.request_limit: 264 raise RequestLimitExceededError( 265 "Result exceeds limit of %d bytes" % 266 self.request_limit) 267 filename = os.path.normpath(filename).replace("\\", "/") 268 if self.dp_replace_re and self.dp_replace_sub: 269 filename = self.dp_replace_re.sub( 270 self.dp_replace_sub.replace("\\", "/"), filename) 271 filename = os.path.normpath(filename) 272 if not os.path.exists(filename): 273 raise Exception("Data file does not exist: %s" % filename) 274 request_rows.append(Request(srcname=srcname, 275 filename=filename, 276 starttime=starttime, 277 endtime=endtime, 278 triminfo=triminfo, 279 bytes=nrow.bytes, 280 samplerate=nrow.samplerate)) 281 logger.debug("EXTRACT: src=%s, file=%s, bytes=%s, rate:%s" % 282 (srcname, filename, nrow.bytes, nrow.samplerate)) 283 except Exception as err: 284 raise Exception("Error accessing data index: %s" % str(err)) 285 286 # Error if request matches no data 287 if total_bytes == 0: 288 raise NoDataError() 289 290 # Get & return the actual data 291 for nrow in request_rows: 292 logger.debug("Extracting %s (%s - %s) from %s" % (nrow.srcname, 293 nrow.starttime, 294 nrow.endtime, 295 nrow.filename)) 296 297 # Iterate through records in section 298 # if only part of the section is needed 299 if nrow.triminfo[0][2] or nrow.triminfo[1][2]: 300 301 for msri in _MSRIterator(filename=nrow.filename, 302 startoffset=nrow.triminfo[0][1], 303 dataflag=False): 304 offset = msri.get_offset() 305 306 # Done if we are beyond end offset 307 if offset >= nrow.triminfo[1][1]: 308 break 309 310 yield _MSRIDataSegment(msri, 311 nrow.samplerate, 312 nrow.starttime, 313 nrow.endtime, 314 nrow.srcname, 315 loglevel=self.loglevel) 316 317 # Check for passing end offset 318 if (offset + msri.msr.contents.reclen) >= \ 319 nrow.triminfo[1][1]: 320 break 321 322 # Otherwise, return the entire section 323 else: 324 yield _FileDataSegment(nrow.filename, nrow.triminfo[0][1], 325 nrow.bytes, nrow.srcname) 326 327 328if __name__ == '__main__': 329 import doctest 330 doctest.testmod(exclude_empty=True) 331