ring_buffer.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. import numpy as np
  2. class RingBuffer:
  3. def __init__(self, length=20):
  4. self._length = length
  5. self._buffer = np.zeros(length, dtype=np.float64)
  6. self._delimiter = 0
  7. self._is_full = False
  8. def __dealloc__(self):
  9. self._buffer = None
  10. def c_add_value(self, value):
  11. self._buffer[self._delimiter] = value
  12. self.c_increment_delimiter()
  13. def c_increment_delimiter(self):
  14. self._delimiter = (self._delimiter + 1) % self._length
  15. if not self._is_full and self._delimiter == 0:
  16. self._is_full = True
  17. def c_is_empty(self):
  18. return (not self._is_full) and (0 == self._delimiter)
  19. def c_get_last_value(self):
  20. if self.c_is_empty():
  21. return np.nan
  22. return self._buffer[self._delimiter - 1]
  23. def c_is_full(self):
  24. return self._is_full
  25. def c_mean_value(self):
  26. result = np.nan
  27. if self._is_full:
  28. result = np.mean(self.c_get_as_numpy_array())
  29. return result
  30. def c_variance(self):
  31. result = np.nan
  32. if self._is_full:
  33. result = np.var(self.c_get_as_numpy_array())
  34. return result
  35. def c_std_dev(self):
  36. result = np.nan
  37. if self._is_full:
  38. result = np.std(self.c_get_as_numpy_array())
  39. return result
  40. def c_get_as_numpy_array(self):
  41. if not self._is_full:
  42. indexes = np.arange(0, stop=self._delimiter, dtype=np.int16)
  43. else:
  44. indexes = np.arange(self._delimiter, stop=self._delimiter + self._length,
  45. dtype=np.int16) % self._length
  46. return np.asarray(self._buffer)[indexes]
  47. def add_value(self, val):
  48. self.c_add_value(val)
  49. def get_as_numpy_array(self):
  50. return self.c_get_as_numpy_array()
  51. def get_last_value(self):
  52. return self.c_get_last_value()
  53. @property
  54. def is_full(self):
  55. return self.c_is_full()
  56. @property
  57. def mean_value(self):
  58. return self.c_mean_value()
  59. @property
  60. def std_dev(self):
  61. return self.c_std_dev()
  62. @property
  63. def variance(self):
  64. return self.c_variance()
  65. @property
  66. def length(self) -> int:
  67. return self._length
  68. @length.setter
  69. def length(self, value):
  70. data = self.get_as_numpy_array()
  71. self._length = value
  72. self._buffer = np.zeros(value, dtype=np.float64)
  73. self._delimiter = 0
  74. self._is_full = False
  75. for val in data[-value:]:
  76. self.add_value(val)