import time import utils import numpy as np from loguru import logger class Predictor: ''' reference ''' def __init__(self, ref_name = ["Unknown Market"], alpha = [1.0 for _ in range(99)], gamma = 0.999): self.loop = 0 self.arr = [] self.trade_mp_series = [] self.ref_mp_series = [] self.ref_num = len(ref_name) ### 定价 self.window = 10 ### 价格系数 self.alpha = alpha # 参考 print('定价系数:', gamma) self.gamma = gamma self.avg_spread = [None for _ in range(self.ref_num)] def processer(self): ''' 计算任务 ''' # update trade mp bp=self.arr[-1][utils.BP_INDEX] ap=self.arr[-1][utils.AP_INDEX] mp=(bp+ap)*0.5 self.trade_mp_series.append(mp) # 更新参考盘口mp ref_mp = [] for i in range(self.ref_num): bp = self.arr[-1][utils.LEN*(1+i)+utils.BP_INDEX] ap = self.arr[-1][utils.LEN*(1+i)+utils.AP_INDEX] mp=(bp+ap)*0.5 ref_mp.append(mp) self.ref_mp_series.append(ref_mp) # 偏差计算 self._update_avg_spread() def _update_avg_spread(self): ''' 更新平均偏差 ''' # 计算偏差1 for i in range(self.ref_num): bias = self.ref_mp_series[-1][i]*self.alpha[i] - self.trade_mp_series[-1] # 如果是刚启动 gamma不能太大 if self.loop < 100: gamma = 0.9 else: gamma = self.gamma if self.avg_spread[i] == None: self.avg_spread[i] = bias else: self.avg_spread[i] = self.avg_spread[i]*gamma + bias*(1-gamma) def check_length(self): # 行情缓存 if len(self.arr) > self.window:del(self.arr[0]) if len(self.trade_mp_series) > self.window:del(self.trade_mp_series[0]) if len(self.ref_mp_series) > self.window:del(self.ref_mp_series[0]) # @utils.timeit def onTime(self, data): if isinstance(data, list): if len(data) > 0: self.loop += 1 self.arr.append(data) self.processer() self.check_length() else: print("行情数据为空") else: print("行情数据为None") # @utils.timeit def Get_ref(self, ref_ticker): ''' get ref price ''' ref_mid = [] for i in range(self.ref_num): ref_mid.append( [ref_ticker[i][0]*self.alpha[i] - self.avg_spread[i], ref_ticker[i][1]*self.alpha[i] - self.avg_spread[i]] ) return ref_mid if __name__ == "__main__": import pandas as pd import numpy as np import matplotlib matplotlib.use('TkAgg') import matplotlib.pyplot as plt arr = pd.read_csv('history/ftm_usdt_binance_usdt_swap.csv').values.tolist() def line_data_to_tickers(data, ref_num): ref_tickers = [] for i in range(ref_num): ref_tickers.append([data[utils.LEN*(i+1)+utils.BP_INDEX], data[utils.LEN*(i+1)+utils.AP_INDEX]]) return ref_tickers ref_num = len(arr[0])//utils.LEN - 1 p = Predictor(ref_name=["unkwon" for _ in range(ref_num)]) t = [] ref_index = 1 for data in arr: p.onTime(data) trade_mp = (data[utils.BP_INDEX] + data[utils.AP_INDEX])*0.5 ref_price = p.Get_ref(line_data_to_tickers(data, ref_num)) t.append([ trade_mp, (ref_price[ref_index][0]+ref_price[ref_index][1])*0.5, ]) t = pd.DataFrame(t,columns=['mp','ref']) if 1: plt.figure() plt.plot(t['mp'],'k') plt.plot(t['ref'],'g') plt.grid() plt.show()