JiahengHe преди 1 година
родител
ревизия
35708599c5

+ 2 - 2
price_collection/src/main/java/com/liangjiang/price_collection/controller/ApiController.java

@@ -58,7 +58,7 @@ public class ApiController {
 //        Integer start = 1705916692;
 //        Integer end = 1705916714;
         try {
-            return R.ok(queryService.performQueries(exchange.split(","), coin, times[1], times[0]));
+            return R.ok(queryService.performQueries(exchange.split(","), coin, times[1], times[0], false));
 //            return queryService.performQueries(exchange.split(","), coin, start, end);
         } catch (Exception ex){
             log.error("查询出错", ex);
@@ -70,7 +70,7 @@ public class ApiController {
     public R getAddPrices(@RequestParam("exchange") String exchange, @RequestParam("coin")String coin,@RequestParam("startTime") Integer startTime){
         Integer endTime = Math.toIntExact(System.currentTimeMillis()/1000);
         try {
-            return R.ok(queryService.performQueries(exchange.split(","), coin, startTime, endTime));
+            return R.ok(queryService.performQueries(exchange.split(","), coin, startTime, endTime, true));
         } catch (Exception ex){
             log.error("查询出错", ex);
             return R.ok(PriceListDto.builder().time(new HashSet<>()).values(new LinkedList<>()).build());

+ 3 - 0
price_collection/src/main/java/com/liangjiang/price_collection/mapper/TableMapper.java

@@ -29,4 +29,7 @@ public interface TableMapper {
 
 
     List<PriceInfoDto> getListTest();
+
+    // 获取最近的价格
+    PriceDto getLastByTime(@Param("tableName") String tableName, @Param("startTime") Integer startTime);
 }

+ 3 - 0
price_collection/src/main/java/com/liangjiang/price_collection/service/ITableService.java

@@ -25,4 +25,7 @@ public interface ITableService {
 
     // 获取价格信息
     List<PriceDto> getPriceInfo(String tableName, Integer startTime, Integer endTime);
+
+    // 获取距离时间最近的价格
+    PriceDto getLastByTime(String tableName, Integer startTime);
 }

+ 11 - 8
price_collection/src/main/java/com/liangjiang/price_collection/service/impl/QueryService.java

@@ -27,13 +27,13 @@ public class QueryService {
         this.tableService = tableService;
     }
 
-    public PriceListDto performQueries(String[] exchanges, String coin, Integer startTime, Integer endTime) throws Exception {
+    public PriceListDto performQueries(String[] exchanges, String coin, Integer startTime, Integer endTime, boolean isAdd) throws Exception {
         List<Future<ExchangeCoinPriceDto>> futures = new ArrayList<>();
         Set<ExchangeCoinPriceDto> results = new HashSet<>();
         PriceListDto priceListDto = PriceListDto.builder().build();
         // 分发查询
         for (String exchange : exchanges) {
-            TableQueryTask queryTask = new TableQueryTask(tableService, coin, exchange, startTime, endTime);
+            TableQueryTask queryTask = new TableQueryTask(tableService, coin, exchange, startTime, endTime, isAdd);
             futures.add(executorService.submit(queryTask));
         }
         // 收集查询结果
@@ -48,11 +48,14 @@ public class QueryService {
         }
         List<Future<ExchangeCoinPriceArrDto>> resultFutures = new LinkedList<>();
         List<ExchangeCoinPriceArrDto> data = new LinkedList<>();
+        //
+        Set<Integer> times = generateArithmeticSequence(startTime, endTime);
+
         // 获取已有时间的合集
-        Set<Integer> times = new TreeSet<>();
-        results.forEach(dto -> {
-            times.addAll(dto.getPrices().keySet());
-        });
+//        Set<Integer> times = new TreeSet<>();
+//        results.forEach(dto -> {
+//            times.addAll(dto.getPrices().keySet());
+//        });
 
         for (ExchangeCoinPriceDto dto : results){
             DataAssembleTask task = new DataAssembleTask(dto, coin, times);
@@ -85,13 +88,13 @@ public class QueryService {
         return new Integer[]{startTimeInt, Math.toIntExact(threeDaysAgoSeconds)};
     }
 
-    public static List<Integer> generateArithmeticSequence(int start, int end) {
+    public static Set<Integer> generateArithmeticSequence(int start, int end) {
         if (start > end) {
             System.out.println("start = " + start + ", end = " + end);
             throw new IllegalArgumentException("首项必须小于或等于末项");
         }
 
-        List<Integer> sequence = new ArrayList<>();
+        Set<Integer> sequence = new TreeSet<>();
         for (int i = start; i <= end; i++) {
             sequence.add(i);
         }

+ 5 - 0
price_collection/src/main/java/com/liangjiang/price_collection/service/impl/TableServiceImpl.java

@@ -115,6 +115,11 @@ public class TableServiceImpl implements ITableService {
         return this.tableMapper.getPriceInfo(tableName, startTime, endTime);
     }
 
+    @Override
+    public PriceDto getLastByTime(String tableName, Integer startTime){
+        return this.tableMapper.getLastByTime(tableName, startTime);
+    }
+
     // 3-10: 最少有3个并发消费者,最多可以扩展到10个并发消费者。消息处理的顺序性无法保证(此处业务不涉及强顺序)
     @JmsListener(destination = "priceQueue", concurrency = "3-10")
     public void receivePriceInfo(ObjectMessage objectMessage){

+ 12 - 0
price_collection/src/main/java/com/liangjiang/price_collection/utils/TableQueryTask.java

@@ -1,5 +1,6 @@
 package com.liangjiang.price_collection.utils;
 
+import cn.hutool.core.collection.CollectionUtil;
 import com.liangjiang.price_collection.dto.ExchangeCoinPriceDto;
 import com.liangjiang.price_collection.dto.PriceDto;
 import com.liangjiang.price_collection.service.ITableService;
@@ -21,12 +22,23 @@ public class TableQueryTask implements Callable<ExchangeCoinPriceDto> {
     private String exchange;
     private Integer startTime;
     private Integer endTime;
+    // 是否是增量
+    private boolean isAdd;
 
     @Override
     public ExchangeCoinPriceDto call() {
         try {
             String tableName = String.format("%s_%s", this.coin.toUpperCase(), this.exchange.toUpperCase());
             List<PriceDto> price = tableService.getPriceInfo(tableName, this.startTime, this.endTime);
+            // 获取最近价格 增量
+            if(isAdd && CollectionUtil.isEmpty(price)){
+                PriceDto dto = tableService.getLastByTime(tableName, this.startTime);
+                if(dto != null){
+                    dto.setTime(this.startTime);
+                    price.add(dto);
+                }
+            }
+
             TreeMap<Integer, PriceDto> timePrice = price.stream().collect(Collectors.toMap(PriceDto::getTime, p -> p, (o, n) -> n, TreeMap::new));
             return ExchangeCoinPriceDto.builder().exchange(exchange).prices(timePrice).build();
         }catch (Exception ex){

+ 10 - 0
price_collection/src/main/resources/mapper/TableMapper.xml

@@ -49,4 +49,14 @@
     <select id="getPriceInfo" resultType="com.liangjiang.price_collection.dto.PriceDto">
         select id as `time`, bid, ask from ${tableName} where id &gt; ${startTime} and id &lt; ${endTime} order by id asc
     </select>
+
+    <select id="getLastByTime" resultType="com.liangjiang.price_collection.dto.PriceDto">
+        SELECT
+            id as `time`, bid, ask
+        FROM
+        ${tableName}
+        WHERE
+            id &lt; ${startTime}
+        LIMIT 1
+    </select>
 </mapper>