| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- import time
- import utils
- import numpy as np
- class Predictor:
- '''
- reference
- '''
- def __init__(self, ref_name = ["Unknown Market"], alpha = [1.0 for _ in range(99)], gamma = 0.999):
- self.loop = 0
- self.arr = []
- self.trade_mp_series = []
- self.ref_mp_series = []
- self.ref_num = len(ref_name)
- ### 定价
- self.window = 10
- ### 价格系数
- self.alpha = alpha
- # 参考
- print('定价系数:', gamma)
- self.gamma = gamma
- self.avg_spread = [None for _ in range(self.ref_num)]
-
- def processer(self):
- '''
- 计算任务
- '''
- # update trade mp
- bp=self.arr[-1][utils.BP_INDEX]
- ap=self.arr[-1][utils.AP_INDEX]
- mp=(bp+ap)*0.5
- self.trade_mp_series.append(mp)
- # 更新参考盘口mp
- ref_mp = []
- for i in range(self.ref_num):
- bp = self.arr[-1][utils.LEN*(1+i)+utils.BP_INDEX]
- ap = self.arr[-1][utils.LEN*(1+i)+utils.AP_INDEX]
- mp=(bp+ap)*0.5
- ref_mp.append(mp)
- self.ref_mp_series.append(ref_mp)
- # 偏差计算
- self._update_avg_spread()
- def _update_avg_spread(self):
- '''
- 更新平均偏差
- '''
- # 计算偏差1
- for i in range(self.ref_num):
- bias = self.ref_mp_series[-1][i]*self.alpha[i] - self.trade_mp_series[-1]
- # 如果是刚启动 gamma不能太大
- if self.loop < 100:
- gamma = 0.9
- else:
- gamma = self.gamma
- if self.avg_spread[i] == None:
- self.avg_spread[i] = bias
- else:
- self.avg_spread[i] = self.avg_spread[i]*gamma + bias*(1-gamma)
- def check_length(self):
- # 行情缓存
- if len(self.arr) > self.window:del(self.arr[0])
- if len(self.trade_mp_series) > self.window:del(self.trade_mp_series[0])
- if len(self.ref_mp_series) > self.window:del(self.ref_mp_series[0])
- # @utils.timeit
- def onTime(self, data):
- if isinstance(data, list):
- if len(data) > 0:
- self.loop += 1
- self.arr.append(data)
- self.processer()
- self.check_length()
- else:
- print("行情数据为空")
- else:
- print("行情数据为None")
- # @utils.timeit
- def Get_ref(self, ref_ticker):
- '''
- get ref price
- '''
- ref_mid = []
- for i in range(self.ref_num):
- ref_mid.append(
- [ref_ticker[i][0]*self.alpha[i] - self.avg_spread[i], ref_ticker[i][1]*self.alpha[i] - self.avg_spread[i]]
- )
- return ref_mid
- if __name__ == "__main__":
- import pandas as pd
- import numpy as np
- import matplotlib
- matplotlib.use('TkAgg')
- import matplotlib.pyplot as plt
- arr = pd.read_csv('history/ftm_usdt_binance_usdt_swap.csv').values.tolist()
- def line_data_to_tickers(data, ref_num):
- ref_tickers = []
- for i in range(ref_num):
- ref_tickers.append([data[utils.LEN*(i+1)+utils.BP_INDEX], data[utils.LEN*(i+1)+utils.AP_INDEX]])
- return ref_tickers
- ref_num = len(arr[0])//utils.LEN - 1
- p = Predictor(ref_name=["unkwon" for _ in range(ref_num)])
- t = []
- ref_index = 1
- for data in arr:
- p.onTime(data)
- trade_mp = (data[utils.BP_INDEX] + data[utils.AP_INDEX])*0.5
- ref_price = p.Get_ref(line_data_to_tickers(data, ref_num))
- t.append([
- trade_mp,
- (ref_price[ref_index][0]+ref_price[ref_index][1])*0.5,
- ])
- t = pd.DataFrame(t,columns=['mp','ref'])
- if 1:
- plt.figure()
- plt.plot(t['mp'],'k')
- plt.plot(t['ref'],'g')
- plt.grid()
- plt.show()
|