1# -*- coding: utf-8 -*- 2""" 3Module for handling ObsPy RtMemory objects. 4 5:copyright: 6 The ObsPy Development Team (devs@obspy.org) & Anthony Lomax 7:license: 8 GNU Lesser General Public License, Version 3 9 (https://www.gnu.org/copyleft/lesser.html) 10""" 11from __future__ import (absolute_import, division, print_function, 12 unicode_literals) 13from future.builtins import * # NOQA 14 15import numpy as np 16 17 18class RtMemory: 19 """ 20 Real time memory class. 21 """ 22 def __init__(self): 23 self.initialized = False 24 25 def initialize(self, data_type, length_input, length_output, 26 input_initial_value=0, output_initial_value=0): 27 """ 28 Create and initialize input and output arrays for this RtMemory object. 29 30 :type data_type: numpy.dtype 31 :param data_type: Desired array data-type. 32 :type length_input: int 33 :param length_input: length of the input memory array. 34 :type length_output: int 35 :param length_output: length of the output memory array. 36 :type input_initial_value: float, optional 37 :param input_initial_value: Initialization value for the input 38 memory array (default is 1.0). 39 :type output_initial_value: float, optional 40 :param output_initial_value: Initialization value for the output 41 memory array (default is 1.0). 42 """ 43 self.input = np.empty(length_input, data_type) 44 self.input.fill(input_initial_value) 45 46 self.output = np.empty(length_output, data_type) 47 self.output.fill(output_initial_value) 48 49 self.initialized = True 50 51 def _update(self, memory_array, data): 52 """ 53 Update specified memory array using specified number of points from 54 end of specified data array. 55 56 :type memory_array: numpy.ndarray 57 :param memory_array: Memory array (input or output) in this 58 RtMemory object to update. 59 :type data: numpy.ndarray 60 :param data: Data array to use for update. 61 :return: NumPy :class:`~numpy.ndarray` object containing updated 62 memory array (input or output). 63 """ 64 if data.size >= np.size(memory_array): 65 # data length greater than or equal to memory length 66 memory_array = data[np.size(data) - np.size(memory_array):] 67 else: 68 # data length less than memory length 69 # shift memory 70 memory_array = memory_array[data.size:] 71 # append data 72 memory_array = np.concatenate((memory_array, data)) 73 return memory_array 74 75 def update_output(self, data): 76 """ 77 Update output memory using specified number of points from end of 78 specified array. 79 80 :type data: numpy.ndarray 81 :param data: Data array to use for update. 82 """ 83 self.output = self._update(self.output, data) 84 85 def update_input(self, data): 86 """ 87 Update input memory using specified number of points from end of 88 specified array. 89 90 :type data: numpy.ndarray 91 :param data: Data array to use for update. 92 """ 93 self.input = self._update(self.input, data) 94