| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- import numpy as np
- class RingBuffer:
- def __init__(self, length=20):
- self._length = length
- self._buffer = np.zeros(length, dtype=np.float64)
- self._delimiter = 0
- self._is_full = False
- def __dealloc__(self):
- self._buffer = None
- def c_add_value(self, value):
- self._buffer[self._delimiter] = value
- self.c_increment_delimiter()
- def c_increment_delimiter(self):
- self._delimiter = (self._delimiter + 1) % self._length
- if not self._is_full and self._delimiter == 0:
- self._is_full = True
- def c_is_empty(self):
- return (not self._is_full) and (0 == self._delimiter)
- def c_get_last_value(self):
- if self.c_is_empty():
- return np.nan
- return self._buffer[self._delimiter - 1]
- def c_is_full(self):
- return self._is_full
- def c_mean_value(self):
- result = np.nan
- if self._is_full:
- result = np.mean(self.c_get_as_numpy_array())
- return result
- def c_variance(self):
- result = np.nan
- if self._is_full:
- result = np.var(self.c_get_as_numpy_array())
- return result
- def c_std_dev(self):
- result = np.nan
- if self._is_full:
- result = np.std(self.c_get_as_numpy_array())
- return result
- def c_get_as_numpy_array(self):
- if not self._is_full:
- indexes = np.arange(0, stop=self._delimiter, dtype=np.int16)
- else:
- indexes = np.arange(self._delimiter, stop=self._delimiter + self._length,
- dtype=np.int16) % self._length
- return np.asarray(self._buffer)[indexes]
- def add_value(self, val):
- self.c_add_value(val)
- def get_as_numpy_array(self):
- return self.c_get_as_numpy_array()
- def get_last_value(self):
- return self.c_get_last_value()
- @property
- def is_full(self):
- return self.c_is_full()
- @property
- def mean_value(self):
- return self.c_mean_value()
- @property
- def std_dev(self):
- return self.c_std_dev()
- @property
- def variance(self):
- return self.c_variance()
- @property
- def length(self) -> int:
- return self._length
- @length.setter
- def length(self, value):
- data = self.get_as_numpy_array()
- self._length = value
- self._buffer = np.zeros(value, dtype=np.float64)
- self._delimiter = 0
- self._is_full = False
- for val in data[-value:]:
- self.add_value(val)
|