|
@@ -24,21 +24,6 @@ import java.util.*;
|
|
|
public class TableServiceImpl implements ITableService {
|
|
public class TableServiceImpl implements ITableService {
|
|
|
private final TableMapper tableMapper;
|
|
private final TableMapper tableMapper;
|
|
|
private final IBackupsInfoService backupsInfoService;
|
|
private final IBackupsInfoService backupsInfoService;
|
|
|
- private final Interner<Object> interner = Interners.newWeakInterner();
|
|
|
|
|
-
|
|
|
|
|
- @Override
|
|
|
|
|
- public boolean createTable(String source, String coin, String type){
|
|
|
|
|
- String tableName = String.format("%s_%s_%s", coin.replaceAll("_", ""), source, type).toUpperCase();
|
|
|
|
|
- synchronized (interner.intern(tableName)){
|
|
|
|
|
- try {
|
|
|
|
|
- tableMapper.createTable(tableName);
|
|
|
|
|
- return true;
|
|
|
|
|
- } catch (Exception ex){
|
|
|
|
|
- log.error(tableName + "创建表失败", ex);
|
|
|
|
|
- return false;
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
public Set<String> getExchangeSelect (){
|
|
public Set<String> getExchangeSelect (){
|
|
@@ -111,37 +96,6 @@ public class TableServiceImpl implements ITableService {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- @Override
|
|
|
|
|
- public void savePrice(PriceInfoDto dto){
|
|
|
|
|
- String tableName = String.format("%s_%s_%s", dto.getCoin().replaceAll("_", ""), dto.getSource(), dto.getB_type()).toUpperCase();
|
|
|
|
|
- try {
|
|
|
|
|
- // 尝试先写入
|
|
|
|
|
- this.tableMapper.savePrice(tableName, dto.getTime(), dto.getAsk(), dto.getBid());
|
|
|
|
|
- } catch (BadSqlGrammarException ex){// 捕获表不存在的异常
|
|
|
|
|
- // 建表
|
|
|
|
|
- this.createTable(dto.getSource(), dto.getCoin(), dto.getB_type());
|
|
|
|
|
- try {
|
|
|
|
|
- // 写入
|
|
|
|
|
- this.tableMapper.savePrice(tableName, dto.getTime(), dto.getAsk(), dto.getBid());
|
|
|
|
|
- } catch (Exception er){
|
|
|
|
|
- // 写入存档表
|
|
|
|
|
- BackupsInfo backupsInfo = new BackupsInfo();
|
|
|
|
|
- backupsInfo.setTableName(tableName);
|
|
|
|
|
- backupsInfo.setAsk(dto.getAsk());
|
|
|
|
|
- backupsInfo.setBid(dto.getBid());
|
|
|
|
|
- backupsInfo.setTime(dto.getTime());
|
|
|
|
|
- backupsInfoService.save(backupsInfo);
|
|
|
|
|
- }
|
|
|
|
|
- } catch (Exception e){ // 其他异常
|
|
|
|
|
- // 写入存档表
|
|
|
|
|
- BackupsInfo backupsInfo = new BackupsInfo();
|
|
|
|
|
- backupsInfo.setTableName(tableName);
|
|
|
|
|
- backupsInfo.setAsk(dto.getAsk());
|
|
|
|
|
- backupsInfo.setBid(dto.getBid());
|
|
|
|
|
- backupsInfo.setTime(dto.getTime());
|
|
|
|
|
- backupsInfoService.save(backupsInfo);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
public List<PriceDto> getPriceInfo(String tableName, Integer startTime, Integer endTime){
|
|
public List<PriceDto> getPriceInfo(String tableName, Integer startTime, Integer endTime){
|
|
@@ -151,12 +105,15 @@ public class TableServiceImpl implements ITableService {
|
|
|
// 3-10: 最少有3个并发消费者,最多可以扩展到10个并发消费者。消息处理的顺序性无法保证(此处业务不涉及强顺序)
|
|
// 3-10: 最少有3个并发消费者,最多可以扩展到10个并发消费者。消息处理的顺序性无法保证(此处业务不涉及强顺序)
|
|
|
@JmsListener(destination = "priceQueue", concurrency = "3-10")
|
|
@JmsListener(destination = "priceQueue", concurrency = "3-10")
|
|
|
public void receivePriceInfo(ObjectMessage objectMessage){
|
|
public void receivePriceInfo(ObjectMessage objectMessage){
|
|
|
|
|
+// long start = System.currentTimeMillis();
|
|
|
try {
|
|
try {
|
|
|
List<PriceInfoDto> param = (List<PriceInfoDto>) objectMessage.getObject();
|
|
List<PriceInfoDto> param = (List<PriceInfoDto>) objectMessage.getObject();
|
|
|
this.savePriceBatch(param);
|
|
this.savePriceBatch(param);
|
|
|
} catch (Exception ex){
|
|
} catch (Exception ex){
|
|
|
log.error("接收消息错误", ex);
|
|
log.error("接收消息错误", ex);
|
|
|
}
|
|
}
|
|
|
|
|
+// long end = System.currentTimeMillis();
|
|
|
|
|
+// System.out.println("耗时 = " + (start - end));
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
public static Set<String> getUnion(Set<Set<String>> sets) {
|
|
public static Set<String> getUnion(Set<Set<String>> sets) {
|