| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- import time
- from decimal import Decimal
- import utils
- from util.instant_volatility import InstantVolatilityIndicator
- from util.trading_intensity import TradingIntensityIndicator
- class PredictorNew:
- '''
- reference
- '''
- def __init__(self, ref_name=["Unknown Market"], alpha=[1.0 for _ in range(99)], gamma=0.999):
- self.loop = 0
- self.arr = []
- self.trade_mp_series = []
- self.ref_mp_series = []
- self.ref_num = len(ref_name)
- ### 定价
- self.window = 10
- ### 价格系数
- self.alpha = alpha
- # 参考
- print('定价系数:', gamma)
- self.gamma = gamma
- self.avg_spread = [None for _ in range(self.ref_num)]
- self.vol = InstantVolatilityIndicator(sampling_length=100, processing_length=1)
- self.trading_intensity = TradingIntensityIndicator(sampling_length=20)
- def processer(self):
- '''
- 计算任务
- '''
- # update trade mp
- bp = self.arr[-1][utils.BP_INDEX]
- ap = self.arr[-1][utils.AP_INDEX]
- mp = (bp + ap) * 0.5
- self.trade_mp_series.append(mp)
- # 更新参考盘口mp
- ref_mp = []
- for i in range(self.ref_num):
- bp = self.arr[-1][utils.LEN * (1 + i) + utils.BP_INDEX]
- ap = self.arr[-1][utils.LEN * (1 + i) + utils.AP_INDEX]
- mp = (bp + ap) * 0.5
- ref_mp.append(mp)
- self.ref_mp_series.append(ref_mp)
- # 偏差计算
- self._update_avg_spread()
- def _update_avg_spread(self):
- '''
- 更新平均偏差
- '''
- # 计算偏差1
- for i in range(self.ref_num):
- bias = self.ref_mp_series[-1][i] * self.alpha[i] - self.trade_mp_series[-1]
- # 如果是刚启动 gamma不能太大
- if self.loop < 100:
- gamma = 0.9
- else:
- gamma = self.gamma
- if self.avg_spread[i] == None:
- self.avg_spread[i] = bias
- else:
- self.avg_spread[i] = self.avg_spread[i] * gamma + bias * (1 - gamma)
- def check_length(self):
- # 行情缓存
- if len(self.arr) > self.window: del (self.arr[0])
- if len(self.trade_mp_series) > self.window: del (self.trade_mp_series[0])
- if len(self.ref_mp_series) > self.window: del (self.ref_mp_series[0])
- # @utils.timeit
- def onTime(self, data):
- if isinstance(data, list):
- if len(data) > 0:
- self.loop += 1
- self.arr.append(data)
- self.processer()
- self.check_length()
- else:
- print("行情数据为空")
- else:
- print("行情数据为None")
- def market_update(self, best_mid_price, time_num):
- self.vol.add_sample(best_mid_price)
- self.trading_intensity.calculate(best_mid_price, time_num)
- # @utils.timeit
- # def Get_ref(self, ref_ticker):
- # '''
- # get ref price
- # '''
- # ref_mid = []
- # for i in range(self.ref_num):
- # ref_mid.append(
- # [ref_ticker[i][0] * self.alpha[i] - self.avg_spread[i],
- # ref_ticker[i][1] * self.alpha[i] - self.avg_spread[i]]
- # )
- # return ref_mid
- def Get_ref(self, ref_ticker):
- '''
- get ref price
- '''
- bp = ref_ticker[0][0]
- ap = ref_ticker[0][1]
- mp = Decimal((bp + ap) * 0.5)
- std = self.vol.current_value
- gamma = Decimal(0.5)
- alpha, kappa = self.trading_intensity.current_value
- ref_mid = []
- if alpha != 0 and kappa > 0 and std != 0:
- factor = Decimal(1) + gamma / kappa
- _optimal_spread = gamma * std
- _optimal_spread += 2 * factor.ln()
- _optimal_ask = mp + _optimal_spread / 2
- _optimal_bid = mp - _optimal_spread / 2
- ref_mid.append(
- [_optimal_ask,
- _optimal_bid]
- )
- return ref_mid
- if __name__ == "__main__":
- import pandas as pd
- import matplotlib
- matplotlib.use('TkAgg')
- import matplotlib.pyplot as plt
- arr = pd.read_csv('history/ftm_usdt_binance_usdt_swap.csv').values.tolist()
- def line_data_to_tickers(data, ref_num):
- ref_tickers = []
- for i in range(ref_num):
- ref_tickers.append([data[utils.LEN * (i + 1) + utils.BP_INDEX], data[utils.LEN * (i + 1) + utils.AP_INDEX]])
- return ref_tickers
- ref_num = len(arr[0]) // utils.LEN - 1
- p = PredictorNew(ref_name=["unkwon" for _ in range(ref_num)])
- t = []
- ref_index = 1
- for data in arr:
- p.onTime(data)
- trade_mp = (data[utils.BP_INDEX] + data[utils.AP_INDEX]) * 0.5
- ref_price = p.Get_ref(line_data_to_tickers(data, ref_num))
- t.append([
- trade_mp,
- (ref_price[ref_index][0] + ref_price[ref_index][1]) * 0.5,
- ])
- t = pd.DataFrame(t, columns=['mp', 'ref'])
- if 1:
- plt.figure()
- plt.plot(t['mp'], 'k')
- plt.plot(t['ref'], 'g')
- plt.grid()
- plt.show()
|