소스 검색

买一卖一价收集程序重构

JiahengHe 8 달 전
부모
커밋
308f46a889
20개의 변경된 파일345개의 추가작업 그리고 530개의 파일을 삭제
  1. 0 3
      hot_collection/src/main/java/com/liangjiang/hot_collection/controller/ApiController.java
  2. 9 22
      price_collection/pom.xml
  3. 0 2
      price_collection/src/main/java/com/liangjiang/price_collection/PriceCollectionApp.java
  4. 38 0
      price_collection/src/main/java/com/liangjiang/price_collection/config/RedisConfig.java
  5. 1 5
      price_collection/src/main/java/com/liangjiang/price_collection/controller/ApiController.java
  6. 0 28
      price_collection/src/main/java/com/liangjiang/price_collection/dto/BackupsInfo.java
  7. 4 0
      price_collection/src/main/java/com/liangjiang/price_collection/dto/PriceDto.java
  8. 0 4
      price_collection/src/main/java/com/liangjiang/price_collection/dto/PriceInfoDto.java
  9. 0 9
      price_collection/src/main/java/com/liangjiang/price_collection/mapper/BackupsInfoMapper.java
  10. 0 38
      price_collection/src/main/java/com/liangjiang/price_collection/mapper/TableMapper.java
  11. 0 7
      price_collection/src/main/java/com/liangjiang/price_collection/service/IBackupsInfoService.java
  12. 0 12
      price_collection/src/main/java/com/liangjiang/price_collection/service/impl/BackupsInfoServiceImpl.java
  13. 213 0
      price_collection/src/main/java/com/liangjiang/price_collection/service/impl/RedisServiceImpl.java
  14. 41 83
      price_collection/src/main/java/com/liangjiang/price_collection/service/impl/TableServiceImpl.java
  15. 11 10
      price_collection/src/main/java/com/liangjiang/price_collection/task/DataTask.java
  16. 28 61
      price_collection/src/main/resources/application.yml
  17. 0 91
      price_collection/src/main/resources/basic_application.yml.simple
  18. 0 4
      price_collection/src/main/resources/mapper/InfoBackupsMapper.xml
  19. 0 68
      price_collection/src/main/resources/mapper/TableMapper.xml
  20. 0 83
      price_collection/src/main/resources/write_application.yml.simple

+ 0 - 3
hot_collection/src/main/java/com/liangjiang/hot_collection/controller/ApiController.java

@@ -19,9 +19,6 @@ import org.springframework.web.bind.annotation.RestController;
 
 import java.util.*;
 
-import static com.liangjiang.hot_collection.aop.TaskHot.hotListBinance;
-import static com.liangjiang.hot_collection.aop.TaskHot.hotListGate;
-
 @Slf4j
 @RestController
 @RequiredArgsConstructor

+ 9 - 22
price_collection/pom.xml

@@ -16,23 +16,10 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-web</artifactId>
         </dependency>
-        <dependency>
-            <groupId>com.baomidou</groupId>
-            <artifactId>mybatis-plus-boot-starter</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-aop</artifactId>
         </dependency>
-        <dependency>
-            <groupId>mysql</groupId>
-            <artifactId>mysql-connector-java</artifactId>
-        </dependency>
-        <!-- druid数据源驱动 -->
-        <dependency>
-            <groupId>com.alibaba</groupId>
-            <artifactId>druid-spring-boot-starter</artifactId>
-        </dependency>
         <!-- activemq -->
         <dependency>
             <groupId>org.springframework.boot</groupId>
@@ -44,15 +31,15 @@
             <artifactId>fastjson</artifactId>
         </dependency>
         <!-- 由于SpringBoot 2.x中默认并没有使用Redis连接池,所以需要添加commons-pool2的依赖 -->
-        <!--        <dependency>-->
-        <!--            <groupId>org.apache.commons</groupId>-->
-        <!--            <artifactId>commons-pool2</artifactId>-->
-        <!--        </dependency>-->
-        <!--        &lt;!&ndash;redis依赖配置&ndash;&gt;-->
-        <!--        <dependency>-->
-        <!--            <groupId>org.springframework.boot</groupId>-->
-        <!--            <artifactId>spring-boot-starter-data-redis</artifactId>-->
-        <!--        </dependency>-->
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-pool2</artifactId>
+        </dependency>
+        <!--redis依赖配置-->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-redis</artifactId>
+        </dependency>
         <!-- hutool  一个常用工具集 -->
         <dependency>
             <groupId>cn.hutool</groupId>

+ 0 - 2
price_collection/src/main/java/com/liangjiang/price_collection/PriceCollectionApp.java

@@ -4,7 +4,6 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.jms.annotation.EnableJms;
 import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.transaction.annotation.EnableTransactionManagement;
 
 import java.util.TimeZone;
 
@@ -14,7 +13,6 @@ import java.util.TimeZone;
 @EnableJms
 @EnableScheduling
 @SpringBootApplication
-@EnableTransactionManagement
 public class PriceCollectionApp {
     public static void main(String[] args) {
         // 设置时区

+ 38 - 0
price_collection/src/main/java/com/liangjiang/price_collection/config/RedisConfig.java

@@ -0,0 +1,38 @@
+package com.liangjiang.price_collection.config;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
+import org.springframework.data.redis.serializer.StringRedisSerializer;
+
+@Configuration
+public class RedisConfig {
+
+    @Bean
+    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
+        RedisTemplate<String, Object> template = new RedisTemplate<>();
+        template.setConnectionFactory(factory);
+        
+        // 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值
+        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
+        ObjectMapper om = new ObjectMapper();
+        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
+        om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);
+        jackson2JsonRedisSerializer.setObjectMapper(om);
+        
+        // 使用StringRedisSerializer来序列化和反序列化redis的key值
+        template.setKeySerializer(new StringRedisSerializer());
+        template.setValueSerializer(jackson2JsonRedisSerializer);
+        template.setHashKeySerializer(new StringRedisSerializer());
+        template.setHashValueSerializer(jackson2JsonRedisSerializer);
+        template.afterPropertiesSet();
+        
+        return template;
+    }
+}

+ 1 - 5
price_collection/src/main/java/com/liangjiang/price_collection/controller/ApiController.java

@@ -26,13 +26,9 @@ public class ApiController {
     private final JmsTemplate myJmsTemplate;
     private final Queue priceQueue;
 
-    @PostMapping("/init")
-    public R init(@RequestBody ExchangeCoinDto dto){
-        return R.ok(tableService.initExchange(dto));
-    }
-
     @PostMapping("/add")
     public void saveInfo(@RequestBody List<PriceInfoDto> priceInfoDtos){
+        log.info("开始添加 size: {}", priceInfoDtos.size());
         // 开启持久化
         myJmsTemplate.setDeliveryMode(2);
         myJmsTemplate.setExplicitQosEnabled(true);

+ 0 - 28
price_collection/src/main/java/com/liangjiang/price_collection/dto/BackupsInfo.java

@@ -1,28 +0,0 @@
-package com.liangjiang.price_collection.dto;
-
-import com.baomidou.mybatisplus.annotation.TableField;
-import com.baomidou.mybatisplus.annotation.TableName;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-@TableName("info_backups")
-public class BackupsInfo {
-    // 时间
-    @TableField("time")
-    private Integer time;
-    // 卖价
-    @TableField("ask")
-    private String ask;
-    // 买价
-    @TableField("bid")
-    private String bid;
-    // 表名
-    @TableField("table_name")
-    private  String tableName;
-}

+ 4 - 0
price_collection/src/main/java/com/liangjiang/price_collection/dto/PriceDto.java

@@ -1,8 +1,12 @@
 package com.liangjiang.price_collection.dto;
 
+import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 @Data
+@NoArgsConstructor
+@AllArgsConstructor
 public class PriceDto {
     // 时间戳 秒级
     private Integer time;

+ 0 - 4
price_collection/src/main/java/com/liangjiang/price_collection/dto/PriceInfoDto.java

@@ -18,8 +18,4 @@ public class PriceInfoDto implements Serializable {
     private String bid;
     // 卖价
     private String ask;
-
-    public BackupsInfo convertAdd(){
-        return BackupsInfo.builder().time(this.getTime()).ask(this.getAsk()).bid(this.getBid()).tableName(String.format("%s_%s_%s", this.getCoin().replaceAll("_", ""), this.getSource(), this.getB_type()).toUpperCase()).build();
-    }
 }

+ 0 - 9
price_collection/src/main/java/com/liangjiang/price_collection/mapper/BackupsInfoMapper.java

@@ -1,9 +0,0 @@
-package com.liangjiang.price_collection.mapper;
-
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-import com.liangjiang.price_collection.dto.BackupsInfo;
-import org.apache.ibatis.annotations.Mapper;
-
-@Mapper
-public interface BackupsInfoMapper extends BaseMapper<BackupsInfo> {
-}

+ 0 - 38
price_collection/src/main/java/com/liangjiang/price_collection/mapper/TableMapper.java

@@ -1,38 +0,0 @@
-package com.liangjiang.price_collection.mapper;
-
-import com.liangjiang.price_collection.dto.BackupsInfo;
-import com.liangjiang.price_collection.dto.PriceDto;
-import com.liangjiang.price_collection.dto.PriceInfoDto;
-import org.apache.ibatis.annotations.Mapper;
-import org.apache.ibatis.annotations.Param;
-
-import java.sql.SQLSyntaxErrorException;
-import java.util.List;
-import java.util.Set;
-
-@Mapper
-public interface TableMapper {
-    // 创建表
-    void createTable(@Param("tableName")String tableName);
-    // 批量创建表
-    void createTables(@Param("tableNames")Set<String> tableNames);
-    // 删除数据
-    void deleteData(@Param("tableNames")List<String> tableNames, @Param("time")Integer time);
-
-    // 获取所有表名
-    List<String> getTableName();
-
-    void savePrices(@Param("priceInfos") List<BackupsInfo> priceInfos);
-
-    // 保存价格信息
-    void savePrice(@Param("tableName") String tableName, @Param("id") Integer id, @Param("ask") String ask,@Param("bid") String bid) throws SQLSyntaxErrorException;
-
-    // 获取价格列表
-    List<PriceDto> getPriceInfo(@Param("tableName") String tableName, @Param("startTime") Integer startTime, @Param("endTime") Integer endTime);
-
-
-    List<PriceInfoDto> getListTest();
-
-    // 获取最近的价格
-    PriceDto getLastByTime(@Param("tableName") String tableName, @Param("startTime") Integer startTime);
-}

+ 0 - 7
price_collection/src/main/java/com/liangjiang/price_collection/service/IBackupsInfoService.java

@@ -1,7 +0,0 @@
-package com.liangjiang.price_collection.service;
-
-import com.baomidou.mybatisplus.extension.service.IService;
-import com.liangjiang.price_collection.dto.BackupsInfo;
-
-public interface IBackupsInfoService extends IService<BackupsInfo> {
-}

+ 0 - 12
price_collection/src/main/java/com/liangjiang/price_collection/service/impl/BackupsInfoServiceImpl.java

@@ -1,12 +0,0 @@
-package com.liangjiang.price_collection.service.impl;
-
-import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
-import com.liangjiang.price_collection.dto.BackupsInfo;
-import com.liangjiang.price_collection.mapper.BackupsInfoMapper;
-import com.liangjiang.price_collection.service.IBackupsInfoService;
-import org.springframework.stereotype.Service;
-
-@Service
-public class BackupsInfoServiceImpl extends ServiceImpl<BackupsInfoMapper, BackupsInfo> implements IBackupsInfoService {
-
-}

+ 213 - 0
price_collection/src/main/java/com/liangjiang/price_collection/service/impl/RedisServiceImpl.java

@@ -0,0 +1,213 @@
+package com.liangjiang.price_collection.service.impl;
+
+import com.liangjiang.price_collection.dto.PriceDto;
+import com.liangjiang.price_collection.dto.PriceInfoDto;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Service;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class RedisServiceImpl {
+    private final RedisTemplate<String, Object> redisTemplate;
+    
+    // Redis键前缀
+    private static final String PRICE_KEY_PREFIX = "price:";
+    private static final String EXCHANGE_COIN_KEY = "exchange:coins";
+    private static final int DATA_EXPIRE_DAYS = 3;
+    
+    /**
+     * 保存价格数据到Redis
+     */
+    public void savePriceData(List<PriceInfoDto> dtos) {
+        if (dtos == null || dtos.isEmpty()) {
+            return;
+        }
+        
+        try {
+            for (PriceInfoDto dto : dtos) {
+                String coin = dto.getCoin().replaceAll("_", "").toUpperCase();
+                String exchange = dto.getSource().toUpperCase();
+                String key = PRICE_KEY_PREFIX + coin + ":" + exchange;
+                
+                // 使用Hash结构存储,时间戳作为field
+                redisTemplate.opsForHash().put(key, dto.getTime().toString(), 
+                        new PriceDto(dto.getTime(), dto.getBid(), dto.getAsk()));
+                
+                // 设置过期时间
+                redisTemplate.expire(key, DATA_EXPIRE_DAYS, TimeUnit.DAYS);
+                
+                // 记录交易所和币对的对应关系
+                redisTemplate.opsForSet().add(EXCHANGE_COIN_KEY + ":" + exchange, coin);
+            }
+        } catch (Exception e) {
+            log.error("保存价格数据到Redis失败", e);
+        }
+    }
+    
+    /**
+     * 获取两个交易所共有的币对
+     */
+    public Set<String> getCommonCoins(String[] exchanges) {
+        if (exchanges == null || exchanges.length < 2) {
+            return new HashSet<>();
+        }
+        
+        // 获取第一个交易所的币对
+        Set<Object> firstExchangeCoins = redisTemplate.opsForSet().members(EXCHANGE_COIN_KEY + ":" + exchanges[0].toUpperCase());
+        if (firstExchangeCoins == null || firstExchangeCoins.isEmpty()) {
+            return new HashSet<>();
+        }
+        
+        // 转换为字符串集合
+        Set<String> commonCoins = firstExchangeCoins.stream()
+                .map(Object::toString)
+                .collect(Collectors.toSet());
+        
+        // 与其他交易所取交集
+        for (int i = 1; i < exchanges.length; i++) {
+            Set<Object> currentExchangeCoins = redisTemplate.opsForSet().members(EXCHANGE_COIN_KEY + ":" + exchanges[i].toUpperCase());
+            if (currentExchangeCoins == null || currentExchangeCoins.isEmpty()) {
+                return new HashSet<>();
+            }
+            
+            Set<String> currentCoins = currentExchangeCoins.stream()
+                    .map(Object::toString)
+                    .collect(Collectors.toSet());
+            
+            // 取交集
+            commonCoins.retainAll(currentCoins);
+            
+            if (commonCoins.isEmpty()) {
+                return commonCoins;
+            }
+        }
+        
+        return commonCoins;
+    }
+    public Set<String> getCommonCoins(String exchange1, String exchange2) {
+        Set<Object> coins1 = redisTemplate.opsForSet().members(EXCHANGE_COIN_KEY + ":" + exchange1.toUpperCase());
+        Set<Object> coins2 = redisTemplate.opsForSet().members(EXCHANGE_COIN_KEY + ":" + exchange2.toUpperCase());
+        
+        if (coins1 == null || coins2 == null) {
+            return new HashSet<>();
+        }
+        
+        // 计算交集
+        Set<String> result = new HashSet<>();
+        for (Object coin : coins1) {
+            if (coins2.contains(coin)) {
+                result.add(coin.toString());
+            }
+        }
+        return result;
+    }
+    /**
+     * 获取所有交易所
+     */
+    public Set<String> getAllExchanges() {
+        Set<String> keys = redisTemplate.keys(EXCHANGE_COIN_KEY + ":*");
+        if (keys == null || keys.isEmpty()) {
+            return new HashSet<>();
+        }
+        
+        return keys.stream()
+                .map(key -> key.substring((EXCHANGE_COIN_KEY + ":").length()))
+                .collect(Collectors.toSet());
+    }
+    /**
+     * 根据时间范围查询价格数据
+     */
+    public List<PriceDto> getPriceData(String coin, String exchange, Integer startTime, Integer endTime) {
+        String key = PRICE_KEY_PREFIX + coin.toUpperCase() + ":" + exchange.toUpperCase();
+        
+        // 获取所有时间戳
+        Set<Object> fields = redisTemplate.opsForHash().keys(key);
+        if (fields.isEmpty()) {
+            return new ArrayList<>();
+        }
+        
+        // 过滤时间范围内的数据
+        List<String> timeFields = fields.stream()
+                .map(Object::toString)
+                .filter(time -> {
+                    int timestamp = Integer.parseInt(time);
+                    return timestamp >= startTime && timestamp <= endTime;
+                })
+                .collect(Collectors.toList());
+        
+        if (timeFields.isEmpty()) {
+            return new ArrayList<>();
+        }
+        
+        // 批量获取数据
+        List<Object> values = redisTemplate.opsForHash().multiGet(key, new ArrayList<>(timeFields));
+        
+        return values.stream()
+                .map(value -> (PriceDto) value)
+                .sorted(Comparator.comparing(PriceDto::getTime))
+                .collect(Collectors.toList());
+    }
+    
+    /**
+     * 获取最近的价格数据
+     */
+    public PriceDto getLastPriceData(String coin, String exchange, Integer time) {
+        String key = PRICE_KEY_PREFIX + coin.toUpperCase() + ":" + exchange.toUpperCase();
+        
+        // 获取所有时间戳
+        Set<Object> fields = redisTemplate.opsForHash().keys(key);
+        if (fields.isEmpty()) {
+            return null;
+        }
+        
+        // 找到小于等于指定时间的最大时间戳
+        Optional<Integer> lastTime = fields.stream()
+                .map(field -> Integer.parseInt(field.toString()))
+                .filter(t -> t <= time)
+                .max(Integer::compareTo);
+        
+        if (!lastTime.isPresent()) {
+            return null;
+        }
+        
+        Object value = redisTemplate.opsForHash().get(key, lastTime.get().toString());
+        return value != null ? (PriceDto) value : null;
+    }
+    
+    /**
+     * 清理过期数据
+     */
+    public void cleanExpiredData() {
+        int thresholdTime = (int) (System.currentTimeMillis() / 1000 - DATA_EXPIRE_DAYS * 24 * 60 * 60);
+        
+        Set<String> priceKeys = redisTemplate.keys(PRICE_KEY_PREFIX + "*");
+        if (priceKeys.isEmpty()) {
+            return;
+        }
+        
+        for (String key : priceKeys) {
+            Set<Object> fields = redisTemplate.opsForHash().keys(key);
+            if (fields.isEmpty()) {
+                continue;
+            }
+            
+            // 找出过期的时间戳
+            List<String> expiredFields = fields.stream()
+                    .map(Object::toString)
+                    .filter(time -> Integer.parseInt(time) < thresholdTime)
+                    .collect(Collectors.toList());
+            
+            if (!expiredFields.isEmpty()) {
+                // 批量删除过期数据
+                redisTemplate.opsForHash().delete(key, expiredFields.toArray());
+            }
+        }
+    }
+}

+ 41 - 83
price_collection/src/main/java/com/liangjiang/price_collection/service/impl/TableServiceImpl.java

@@ -1,12 +1,9 @@
 package com.liangjiang.price_collection.service.impl;
 
 import cn.hutool.core.collection.CollectionUtil;
-import com.liangjiang.price_collection.dto.BackupsInfo;
 import com.liangjiang.price_collection.dto.ExchangeCoinDto;
 import com.liangjiang.price_collection.dto.PriceDto;
 import com.liangjiang.price_collection.dto.PriceInfoDto;
-import com.liangjiang.price_collection.mapper.TableMapper;
-import com.liangjiang.price_collection.service.IBackupsInfoService;
 import com.liangjiang.price_collection.service.ITableService;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -20,131 +17,92 @@ import java.util.*;
 @Service
 @RequiredArgsConstructor
 public class TableServiceImpl implements ITableService {
-    private final TableMapper tableMapper;
-    private final IBackupsInfoService backupsInfoService;
+    private final RedisServiceImpl redisService;
 
     @Override
-    public Set<String> getExchangeSelect (){
-        List<String> tableNames = this.getTableNames();
-        if (CollectionUtil.isEmpty(tableNames)){
-            return new HashSet<>();
-        }
-        Set<String> exchanges = new HashSet<>();
-
-        for (String tableName : tableNames){
-            String[] infos = tableName.split("_");
-            if(infos.length != 3){
-//                System.out.println("exchange = " + tableName);
-                continue;
-            }
-            // 交易所和类型
-            exchanges.add(String.format("%s_%s", infos[1], infos[2]));
-        }
-        return exchanges;
+    public Set<String> getExchangeSelect() {
+        // 使用Redis服务获取所有交易所
+        return redisService.getAllExchanges();
     }
-
+    
     @Override
-    public Set<String> getCoinSelect(String[] exchanges){
-        if(exchanges == null || exchanges.length == 0){
-            return new HashSet<>();
-        }
-        List<String> tableNames = this.getTableNames();
-        if (CollectionUtil.isEmpty(tableNames)){
+    public Set<String> getCoinSelect(String[] exchanges) {
+        if (exchanges == null || exchanges.length < 2) {
             return new HashSet<>();
         }
-        Map<String, Set<String>> exchange_coin = new HashMap<>();
-        for (String exchange : exchanges){
-            exchange_coin.put(exchange, new HashSet<>());
-        }
-        for (String tableName : tableNames){
-            String[] infos = tableName.split("_");
-            if(infos.length != 3){
-//                System.out.println("exchange = " + tableName);
-                continue;
-            }
-            // 交易所和类型
-            String key = String.format("%s_%s", infos[1], infos[2]);
-            if (exchange_coin.containsKey(key)){
-                // 存在加入币对
-                exchange_coin.get(key).add(infos[0]);
-            }
-        }
-        Set<Set<String>> values = new HashSet<>(exchange_coin.values());
-        // 返回交集
-        return getUnion(values);
+        
+        // 使用Redis服务获取共有币对
+        return redisService.getCommonCoins(exchanges);
     }
 
     @Override
     public List<String> getTableNames(){
-        return tableMapper.getTableName();
+        // 在Redis模式下不需要表名
+        return new ArrayList<>();
     }
 
     @Override
     public boolean initExchange(ExchangeCoinDto dto){
-        if(dto ==null || CollectionUtil.isEmpty(dto.getCoin())){
-            return false;
-        }
-        try {
-            Set<String> tableNames = new HashSet<>();
-            dto.getCoin().forEach(coin -> {
-                String tableName = String.format("%s_%s_%s", coin.replaceAll("_", ""), dto.getSource(), dto.getB_type()).toUpperCase();
-                tableNames.add(tableName);
-            });
-            // 建表
-            this.tableMapper.createTables(tableNames);
-            return true;
-        }catch (Exception ex){
-            log.error("建表初始化失败!", ex);
-            return false;
-        }
+        // Redis模式下不需要初始化表
+        return true;
     }
 
     @Override
     public void deleteData(List<String> tableNames, Integer time){
-        this.tableMapper.deleteData(tableNames, time);
+        // 清理过期数据由Redis的过期机制和定时任务处理
     }
 
     @Override
     public void savePriceBatch(List<PriceInfoDto> dtos){
-        if(CollectionUtil.isEmpty(dtos)){
+        if (CollectionUtil.isEmpty(dtos)) {
             return;
         }
-        List<BackupsInfo> addDtos = new LinkedList<>();
-        dtos.forEach(dto -> {
-            addDtos.add(dto.convertAdd());
-        });
+        
         try {
-            // 写入数据
-            this.tableMapper.savePrices(addDtos);
-        } catch (Exception ex){
-            log.error("插入报错", ex);
-            backupsInfoService.saveBatch(addDtos);
+            // 使用Redis服务保存数据
+            redisService.savePriceData(dtos);
+        } catch (Exception ex) {
+            log.error("保存价格数据失败", ex);
         }
     }
 
-
     @Override
     public List<PriceDto> getPriceInfo(String tableName, Integer startTime, Integer endTime){
-        return this.tableMapper.getPriceInfo(tableName, startTime, endTime);
+        // 从表名解析币对和交易所
+        String[] parts = tableName.split("_");
+        if (parts.length < 2) {
+            return new ArrayList<>();
+        }
+        
+        String coin = parts[0];
+        String exchange = parts[1];
+        
+        return redisService.getPriceData(coin, exchange, startTime, endTime);
     }
 
     @Override
     public PriceDto getLastByTime(String tableName, Integer startTime){
-        return this.tableMapper.getLastByTime(tableName, startTime);
+        // 从表名解析币对和交易所
+        String[] parts = tableName.split("_");
+        if (parts.length < 2) {
+            return null;
+        }
+        
+        String coin = parts[0];
+        String exchange = parts[1];
+        
+        return redisService.getLastPriceData(coin, exchange, startTime);
     }
 
     // 3-10: 最少有3个并发消费者,最多可以扩展到10个并发消费者。消息处理的顺序性无法保证(此处业务不涉及强顺序)
     @JmsListener(destination = "priceQueue", concurrency = "5-10")
     public void receivePriceInfo(ObjectMessage objectMessage){
-//        long start = System.currentTimeMillis();
         try {
            List<PriceInfoDto> param  = (List<PriceInfoDto>) objectMessage.getObject();
            this.savePriceBatch(param);
         } catch (Exception ex){
             log.error("接收消息错误", ex);
         }
-//        long end = System.currentTimeMillis();
-//        System.out.println("耗时 = " + (start - end));
     }
 
     public static Set<String> getUnion(Set<Set<String>> sets) {

+ 11 - 10
price_collection/src/main/java/com/liangjiang/price_collection/task/DataTask.java

@@ -1,28 +1,29 @@
 package com.liangjiang.price_collection.task;
 
-import com.liangjiang.price_collection.service.ITableService;
-import com.liangjiang.price_collection.service.impl.QueryService;
+import com.liangjiang.price_collection.service.impl.RedisServiceImpl;
 import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 import java.util.List;
 
+@Slf4j
 @Component
 @RequiredArgsConstructor
 public class DataTask {
 
-    private final ITableService tableService;
+    private final RedisServiceImpl redisService;
     // 每天1点30
     @Scheduled(cron="0 30 1 * * ? ")
     public void deleteData() {
-        // 获取3天前时间
-        Integer[] time = QueryService.getDaysAgoTime(System.currentTimeMillis(), 3);
-        Integer threeDayAgo = time[1];
-        // 获取所有表名
-        List<String> tableNames = tableService.getTableNames();
-        // 清除数据
-        tableService.deleteData(tableNames, threeDayAgo);
+        log.info("开始清理过期价格数据");
+        try {
+            redisService.cleanExpiredData();
+            log.info("清理过期价格数据完成");
+        } catch (Exception e) {
+            log.error("清理价格数据失败", e);
+        }
     }
 
 }

+ 28 - 61
price_collection/src/main/resources/application.yml

@@ -14,6 +14,10 @@ server:
 logging:
   config: classpath:log.xml
 spring:
+  autoconfigure:
+    exclude:
+      - org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
+      - org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration
   application:
     name: price_collection
   main:
@@ -22,70 +26,33 @@ spring:
   # 全局配置响应日期格式和时区为东八区,解决日期类型返回前端少八个小时的问题
   jackson:
     time-zone: GMT+8
-  datasource:
-    username: root
-    password: qwe123...
-    url: jdbc:mysql://4lapi.skyfffire.com:3306/one_price_warehouse?characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true&allowMultiQueries=true
-    driver-class-name: com.mysql.cj.jdbc.Driver
-    type: com.alibaba.druid.pool.DruidDataSource
-    druid:
-      initial-size: 20
-      max-active: 80
-      min-idle: 20
-      max-wait: 60000
-      timeBetweenEvictionRunsMillis: 60000
-      minEvictableIdleTimeMillis: 1800000
-      validationQuery: SELECT 1
-      testWhileIdle: true
-      testOnBorrow: false
-      testOnReturn: false
-      poolPreparedStatements: true
-      filter:
-        wall:
-          config:
-            none-base-statement-allow: true
-            multi-statement-allow: true
-      # 配置监控统计拦截的filters,去掉后监控界面sql无法统计。‘wall’用于防火墙
-  #      filters: stat,wall
 
   activemq:
-    broker-url: tcp://kline.skyfffire.com:61616
-#    broker-url: tcp://localhost:61616
+    broker-url: tcp://localhost:61616
     packages:
       # 配置信任所有包 为了支持发送对象消息
       trust-all: true
-    user: admin
-    password: admin
+    user: 4l
+    password: 4l43626546
 
-#  redis:
-#    # Redis服务器地址
-#    host: localhost
-#    # Redis数据库索引(默认为0)
-#    database: 0
-#    # Redis服务器连接端口
-#    port: 6379
-#    # Redis服务器连接密码(默认为空)
-#    password: 123456
-#    # 连接超时时间
-#    timeout: 1000ms
-#    lettuce:
-#      pool:
-#        # 连接池最大连接数
-#        max-active: 8
-#        # 连接池最大空闲连接数
-#        max-idle: 8
-#        # 连接池最小空闲连接数
-#        min-idle: 0
-#        # 连接池最大阻塞等待时间,负值表示没有限制
-#        max-wait: -1ms
-mybatis-plus:
-  mapper-locations: classpath*:mapper/**/*Mapper.xml
-  type-aliases-package: com.liangjiang.price_collection.domain
-  global-config:
-    banner: false
-    db-config:
-      id-type: input
-  configuration:
-    # 控制台SQL日志
-    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
-    map-underscore-to-camel-case: false
+  redis:
+    # Redis服务器地址
+    host: localhost
+    # Redis数据库索引(默认为0)
+    database: 1
+    # Redis服务器连接端口
+    port: 6379
+    # Redis服务器连接密码(默认为空)
+    password: 43626546
+    # 连接超时时间
+    timeout: 1000ms
+    lettuce:
+      pool:
+        # 连接池最大连接数
+        max-active: 8
+        # 连接池最大空闲连接数
+        max-idle: 8
+        # 连接池最小空闲连接数
+        min-idle: 0
+        # 连接池最大阻塞等待时间,负值表示没有限制
+        max-wait: -1ms

+ 0 - 91
price_collection/src/main/resources/basic_application.yml.simple

@@ -1,91 +0,0 @@
-server:
-  servlet:
-    context-path: /priceCollect
-  port: 82
-  tomcat:
-    uri-encoding: UTF-8
-  compression:
-    # 开启响应数据的压缩功能
-    enabled: true
-    # 设置可以被压缩的MIME类型列表
-    mime-types: application/json,application/xml,text/html,text/xml,text/plain
-    # 设置触发压缩的最小响应大小 2kb
-    min-response-size: 2048
-logging:
-  config: classpath:log.xml
-spring:
-  application:
-    name: price_collection
-  main:
-    allow-bean-definition-overriding: true
-    allow-circular-references: true
-  # 全局配置响应日期格式和时区为东八区,解决日期类型返回前端少八个小时的问题
-  jackson:
-    time-zone: GMT+8
-  datasource:
-    username: root
-    password: qwe123...
-    url: jdbc:mysql://kline.skyfffire.com:3306/one_price_warehouse?characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true&allowMultiQueries=true
-    driver-class-name: com.mysql.cj.jdbc.Driver
-    type: com.alibaba.druid.pool.DruidDataSource
-    druid:
-      initial-size: 20
-      max-active: 80
-      min-idle: 20
-      max-wait: 60000
-      timeBetweenEvictionRunsMillis: 60000
-      minEvictableIdleTimeMillis: 1800000
-      validationQuery: SELECT 1
-      testWhileIdle: true
-      testOnBorrow: false
-      testOnReturn: false
-      poolPreparedStatements: true
-      filter:
-        wall:
-          config:
-            none-base-statement-allow: true
-            multi-statement-allow: true
-      # 配置监控统计拦截的filters,去掉后监控界面sql无法统计。‘wall’用于防火墙
-  #      filters: stat,wall
-
-  activemq:
-    broker-url: tcp://kline.skyfffire.com:61616
-#    broker-url: tcp://localhost:61616
-    packages:
-      # 配置信任所有包 为了支持发送对象消息
-      trust-all: true
-    user: admin
-    password: admin
-
-#  redis:
-#    # Redis服务器地址
-#    host: localhost
-#    # Redis数据库索引(默认为0)
-#    database: 0
-#    # Redis服务器连接端口
-#    port: 6379
-#    # Redis服务器连接密码(默认为空)
-#    password: 123456
-#    # 连接超时时间
-#    timeout: 1000ms
-#    lettuce:
-#      pool:
-#        # 连接池最大连接数
-#        max-active: 8
-#        # 连接池最大空闲连接数
-#        max-idle: 8
-#        # 连接池最小空闲连接数
-#        min-idle: 0
-#        # 连接池最大阻塞等待时间,负值表示没有限制
-#        max-wait: -1ms
-mybatis-plus:
-  mapper-locations: classpath*:mapper/**/*Mapper.xml
-  type-aliases-package: com.liangjiang.price_collection.domain
-  global-config:
-    banner: false
-    db-config:
-      id-type: input
-  configuration:
-    # 控制台SQL日志
-    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
-    map-underscore-to-camel-case: false

+ 0 - 4
price_collection/src/main/resources/mapper/InfoBackupsMapper.xml

@@ -1,4 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-<mapper namespace="com.liangjiang.price_collection.mapper.BackupsInfoMapper">
-</mapper>

+ 0 - 68
price_collection/src/main/resources/mapper/TableMapper.xml

@@ -1,68 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-<mapper namespace="com.liangjiang.price_collection.mapper.TableMapper">
-    <!-- 创建数据表 -->
-    <update id="createTable" parameterType="java.lang.String">
-        CREATE TABLE IF NOT EXISTS ${tableName} (
-         `id` INT UNSIGNED NOT NULL,
-         `bid` VARCHAR(128) NOT NULL,
-         `ask` VARCHAR(128) NOT NULL,
-         PRIMARY KEY (`id`)
-        )
-    </update>
-
-    <update id="createTables" parameterType="java.util.Set">
-        <foreach item="item" collection="tableNames" open="" separator=";" close="">
-            CREATE TABLE IF NOT EXISTS ${item} (
-            `id` INT UNSIGNED NOT NULL,
-            `bid` VARCHAR(128) NOT NULL,
-            `ask` VARCHAR(128) NOT NULL,
-            PRIMARY KEY (`id`)
-            )
-        </foreach>
-    </update>
-
-    <update id="deleteData">
-        <foreach item="item" collection="tableNames" open="" separator=";" close="">
-            delete FROM ${item} where id &lt; ${time}
-        </foreach>
-    </update>
-
-    <!-- 获取所有数据表格 -->
-    <select id="getTableName" resultType="java.lang.String">
-        SELECT
-            table_name
-        FROM
-            information_schema.TABLES
-        WHERE
-            table_type = 'BASE TABLE'
-          AND table_schema = 'one_price_warehouse'
-          and table_name != 'info_backups'
-        ORDER BY
-            table_name
-    </select>
-
-    <insert id="savePrices">
-        <foreach item="item" collection="priceInfos" open="" separator=";" close="">
-            INSERT ${item.tableName} ( id, bid, ask ) VALUE (${item.time}, ${item.bid}, ${item.ask})on duplicate key update bid = ${item.bid}, ask = ${item.ask}
-        </foreach>
-    </insert>
-
-    <insert id="savePrice">
-        INSERT ${tableName} ( id, bid, ask ) VALUE (${id}, ${bid}, ${ask})on duplicate key update bid = ${bid}, ask = ${ask}
-    </insert>
-
-    <select id="getPriceInfo" resultType="com.liangjiang.price_collection.dto.PriceDto">
-        select id as `time`, bid, ask from ${tableName} where id &gt; ${startTime} and id &lt; ${endTime} order by id asc
-    </select>
-
-    <select id="getLastByTime" resultType="com.liangjiang.price_collection.dto.PriceDto">
-        SELECT
-            id as `time`, bid, ask
-        FROM
-        ${tableName}
-        WHERE
-            id &lt; ${startTime}
-        LIMIT 1
-    </select>
-</mapper>

+ 0 - 83
price_collection/src/main/resources/write_application.yml.simple

@@ -1,83 +0,0 @@
-server:
-  servlet:
-    context-path: /priceCollect1
-  port: 83
-  tomcat:
-    uri-encoding: UTF-8
-logging:
-  config: classpath:log.xml
-spring:
-  application:
-    name: write_price_collection
-  main:
-    allow-bean-definition-overriding: true
-    allow-circular-references: true
-  # 全局配置响应日期格式和时区为东八区,解决日期类型返回前端少八个小时的问题
-  jackson:
-    time-zone: GMT+8
-  datasource:
-    username: root
-    password: qwe123...
-    url: jdbc:mysql://kline.skyfffire.com:3306/one_price_warehouse?characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true&allowMultiQueries=true
-    driver-class-name: com.mysql.cj.jdbc.Driver
-    type: com.alibaba.druid.pool.DruidDataSource
-    druid:
-      initial-size: 20
-      max-active: 80
-      min-idle: 20
-      max-wait: 60000
-      timeBetweenEvictionRunsMillis: 60000
-      minEvictableIdleTimeMillis: 1800000
-      validationQuery: SELECT 1
-      testWhileIdle: true
-      testOnBorrow: false
-      testOnReturn: false
-      poolPreparedStatements: true
-      filter:
-        wall:
-          config:
-            none-base-statement-allow: true
-            multi-statement-allow: true
-      # 配置监控统计拦截的filters,去掉后监控界面sql无法统计。‘wall’用于防火墙
-#      filters: stat,wall
-
-  activemq:
-    broker-url: tcp://kline.skyfffire.com:61616
-    packages:
-      # 配置信任所有包 为了支持发送对象消息
-      trust-all: true
-    user: admin
-    password: admin
-
-#  redis:
-#    # Redis服务器地址
-#    host: localhost
-#    # Redis数据库索引(默认为0)
-#    database: 0
-#    # Redis服务器连接端口
-#    port: 6379
-#    # Redis服务器连接密码(默认为空)
-#    password: 123456
-#    # 连接超时时间
-#    timeout: 1000ms
-#    lettuce:
-#      pool:
-#        # 连接池最大连接数
-#        max-active: 8
-#        # 连接池最大空闲连接数
-#        max-idle: 8
-#        # 连接池最小空闲连接数
-#        min-idle: 0
-#        # 连接池最大阻塞等待时间,负值表示没有限制
-#        max-wait: -1ms
-mybatis-plus:
-  mapper-locations: classpath*:mapper/**/*Mapper.xml
-  type-aliases-package: com.liangjiang.price_collection.domain
-  global-config:
-    banner: false
-    db-config:
-      id-type: input
-  configuration:
-    # 控制台SQL日志
-    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
-    map-underscore-to-camel-case: false