|
|
@@ -0,0 +1,92 @@
|
|
|
+package com.liangjiang.price_collection.service.impl;
|
|
|
+
|
|
|
+import cn.hutool.core.collection.CollectionUtil;
|
|
|
+import com.liangjiang.price_collection.dto.ExchangeCoinPriceArrDto;
|
|
|
+import com.liangjiang.price_collection.dto.ExchangeCoinPriceDto;
|
|
|
+import com.liangjiang.price_collection.dto.PriceDto;
|
|
|
+import com.liangjiang.price_collection.service.ITableService;
|
|
|
+import com.liangjiang.price_collection.utils.DataAssembleTask;
|
|
|
+import com.liangjiang.price_collection.utils.TableQueryTask;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.Future;
|
|
|
+
|
|
|
+@Service
|
|
|
+public class QueryService {
|
|
|
+ private final ExecutorService executorService;
|
|
|
+ private final ITableService tableService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ public QueryService(ITableService tableService) {
|
|
|
+ this.executorService = Executors.newFixedThreadPool(5); // 根据需要调整线程池大小
|
|
|
+ this.tableService = tableService;
|
|
|
+ }
|
|
|
+
|
|
|
+ public List<ExchangeCoinPriceArrDto> performQueries(String[] exchanges, String coin, Integer startTime, Integer endTime) throws Exception {
|
|
|
+ List<Future<ExchangeCoinPriceDto>> futures = new ArrayList<>();
|
|
|
+ Set<ExchangeCoinPriceDto> results = new HashSet<>();
|
|
|
+ // 分发查询
|
|
|
+ for (String exchange : exchanges) {
|
|
|
+ TableQueryTask queryTask = new TableQueryTask(tableService, coin, exchange, startTime, endTime);
|
|
|
+ futures.add(executorService.submit(queryTask));
|
|
|
+ }
|
|
|
+ // 收集查询结果
|
|
|
+ for (Future<ExchangeCoinPriceDto> future : futures) {
|
|
|
+ ExchangeCoinPriceDto dto = future.get();
|
|
|
+ results.add(dto);
|
|
|
+ }
|
|
|
+ if(CollectionUtil.isEmpty(results)){
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ List<Future<ExchangeCoinPriceArrDto>> resultFutures = new LinkedList<>();
|
|
|
+ List<ExchangeCoinPriceArrDto> data = new LinkedList<>();
|
|
|
+ // 获取时间段的所有秒级
|
|
|
+ List<Integer> times = generateArithmeticSequence(startTime, endTime);
|
|
|
+
|
|
|
+ for (ExchangeCoinPriceDto dto : results){
|
|
|
+ DataAssembleTask task = new DataAssembleTask(dto, coin, times);
|
|
|
+ resultFutures.add(executorService.submit(task));
|
|
|
+ }
|
|
|
+
|
|
|
+ // 收集格式化结果
|
|
|
+ for (Future<ExchangeCoinPriceArrDto> future : resultFutures) {
|
|
|
+ ExchangeCoinPriceArrDto dto = future.get();
|
|
|
+ data.add(dto);
|
|
|
+ }
|
|
|
+ return data;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 用于优雅关闭服务
|
|
|
+ public void shutdown() {
|
|
|
+ executorService.shutdown();
|
|
|
+ }
|
|
|
+
|
|
|
+ public static Integer[] getDaysAgoTime(long startTime, long dayNum){
|
|
|
+ int startTimeInt = Math.toIntExact(startTime / 1000);
|
|
|
+ // 计算3天的毫秒数 (3天 * 24小时 * 60分钟 * 60秒 * 1000毫秒)
|
|
|
+ long threeDaysInMillis = dayNum * 24 * 60 * 60 * 1000;
|
|
|
+ // 计算3天前的毫秒级时间戳
|
|
|
+ long threeDaysAgoMillis = startTime - threeDaysInMillis;
|
|
|
+ // 将毫秒级时间戳转换为秒级时间戳
|
|
|
+ long threeDaysAgoSeconds = threeDaysAgoMillis / 1000;
|
|
|
+ return new Integer[]{startTimeInt, Math.toIntExact(threeDaysAgoSeconds)};
|
|
|
+ }
|
|
|
+
|
|
|
+ public static List<Integer> generateArithmeticSequence(int start, int end) {
|
|
|
+ if (start > end) {
|
|
|
+ System.out.println("start = " + start + ", end = " + end);
|
|
|
+ throw new IllegalArgumentException("首项必须小于或等于末项");
|
|
|
+ }
|
|
|
+
|
|
|
+ List<Integer> sequence = new ArrayList<>();
|
|
|
+ for (int i = start; i <= end; i++) {
|
|
|
+ sequence.add(i);
|
|
|
+ }
|
|
|
+ return sequence;
|
|
|
+ }
|
|
|
+}
|