瀏覽代碼

加入消息队列

JiahengHe 1 年之前
父節點
當前提交
d14656e885

+ 5 - 0
price_collection/pom.xml

@@ -33,6 +33,11 @@
             <groupId>com.alibaba</groupId>
             <groupId>com.alibaba</groupId>
             <artifactId>druid-spring-boot-starter</artifactId>
             <artifactId>druid-spring-boot-starter</artifactId>
         </dependency>
         </dependency>
+        <!-- activemq -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-activemq</artifactId>
+        </dependency>
         <!--json-->
         <!--json-->
         <dependency>
         <dependency>
             <groupId>com.alibaba</groupId>
             <groupId>com.alibaba</groupId>

+ 2 - 0
price_collection/src/main/java/com/liangjiang/price_collection/PriceCollectionApp.java

@@ -2,11 +2,13 @@ package com.liangjiang.price_collection;
 
 
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.jms.annotation.EnableJms;
 import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.transaction.annotation.EnableTransactionManagement;
 import org.springframework.transaction.annotation.EnableTransactionManagement;
 
 
 import java.util.TimeZone;
 import java.util.TimeZone;
 
 
+@EnableJms
 @EnableScheduling
 @EnableScheduling
 @SpringBootApplication
 @SpringBootApplication
 @EnableTransactionManagement
 @EnableTransactionManagement

+ 15 - 0
price_collection/src/main/java/com/liangjiang/price_collection/config/ActiveMQConfig.java

@@ -0,0 +1,15 @@
+package com.liangjiang.price_collection.config;
+
+import org.apache.activemq.command.ActiveMQQueue;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import javax.jms.Queue;
+
+@Configuration
+public class ActiveMQConfig {
+    @Bean(name = "priceQueue")
+    Queue priceQueue(){
+        return new ActiveMQQueue("priceQueue");
+    }
+}

+ 14 - 1
price_collection/src/main/java/com/liangjiang/price_collection/controller/ApiController.java

@@ -6,8 +6,11 @@ import com.liangjiang.price_collection.service.ITableService;
 import com.liangjiang.price_collection.service.impl.QueryService;
 import com.liangjiang.price_collection.service.impl.QueryService;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.jms.core.JmsMessagingTemplate;
+import org.springframework.jms.core.JmsTemplate;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.bind.annotation.*;
 
 
+import javax.jms.Queue;
 import java.util.LinkedList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.List;
 import java.util.Set;
 import java.util.Set;
@@ -19,10 +22,17 @@ import java.util.Set;
 public class ApiController {
 public class ApiController {
     private final ITableService tableService;
     private final ITableService tableService;
     private final QueryService queryService;
     private final QueryService queryService;
+    private final JmsTemplate jmsTemplate;
+    private final Queue priceQueue;
 
 
     @PostMapping("/add")
     @PostMapping("/add")
     public void saveInfo(@RequestBody List<PriceInfoDto> priceInfoDtos){
     public void saveInfo(@RequestBody List<PriceInfoDto> priceInfoDtos){
-        tableService.savePriceBatch(priceInfoDtos);
+        // 开启持久化
+        jmsTemplate.setDeliveryMode(2);
+        jmsTemplate.setExplicitQosEnabled(true);
+        jmsTemplate.setDeliveryPersistent(true);
+        // 写入队列
+        jmsTemplate.convertAndSend(priceQueue, priceInfoDtos);
     }
     }
 
 
     @GetMapping("/getExchange")
     @GetMapping("/getExchange")
@@ -39,8 +49,11 @@ public class ApiController {
     @GetMapping("/getPrices")
     @GetMapping("/getPrices")
     public PriceListDto getPriceRest(@RequestParam("exchange") String exchange, @RequestParam("coin")String coin){
     public PriceListDto getPriceRest(@RequestParam("exchange") String exchange, @RequestParam("coin")String coin){
         Integer[] times = QueryService.getDaysAgoTime(System.currentTimeMillis(), 3L);
         Integer[] times = QueryService.getDaysAgoTime(System.currentTimeMillis(), 3L);
+//        Integer start = 1705916692;
+//        Integer end = 1705916714;
         try {
         try {
             return queryService.performQueries(exchange.split(","), coin, times[1], times[0]);
             return queryService.performQueries(exchange.split(","), coin, times[1], times[0]);
+//            return queryService.performQueries(exchange.split(","), coin, start, end);
         } catch (Exception ex){
         } catch (Exception ex){
             log.error("查询出错", ex);
             log.error("查询出错", ex);
             return PriceListDto.builder().time(new LinkedList<>()).values(new LinkedList<>()).build();
             return PriceListDto.builder().time(new LinkedList<>()).values(new LinkedList<>()).build();

+ 3 - 1
price_collection/src/main/java/com/liangjiang/price_collection/dto/PriceInfoDto.java

@@ -2,8 +2,10 @@ package com.liangjiang.price_collection.dto;
 
 
 import lombok.Data;
 import lombok.Data;
 
 
+import java.io.Serializable;
+
 @Data
 @Data
-public class PriceInfoDto {
+public class PriceInfoDto implements Serializable {
     // 交易所
     // 交易所
     private String source;
     private String source;
     // 合约还是现货 swap spot
     // 合约还是现货 swap spot

+ 12 - 2
price_collection/src/main/java/com/liangjiang/price_collection/service/impl/TableServiceImpl.java

@@ -12,8 +12,10 @@ import com.liangjiang.price_collection.service.ITableService;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.jdbc.BadSqlGrammarException;
 import org.springframework.jdbc.BadSqlGrammarException;
+import org.springframework.jms.annotation.JmsListener;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
 
 
+import javax.jms.ObjectMessage;
 import java.util.*;
 import java.util.*;
 
 
 @Slf4j
 @Slf4j
@@ -109,8 +111,6 @@ public class TableServiceImpl implements ITableService {
         }
         }
     }
     }
 
 
-
-
     @Override
     @Override
     public void savePrice(PriceInfoDto dto){
     public void savePrice(PriceInfoDto dto){
         String tableName = String.format("%s_%s_%s", dto.getCoin().replaceAll("_", ""), dto.getSource(), dto.getB_type()).toUpperCase();
         String tableName = String.format("%s_%s_%s", dto.getCoin().replaceAll("_", ""), dto.getSource(), dto.getB_type()).toUpperCase();
@@ -148,6 +148,16 @@ public class TableServiceImpl implements ITableService {
         return this.tableMapper.getPriceInfo(tableName, startTime, endTime);
         return this.tableMapper.getPriceInfo(tableName, startTime, endTime);
     }
     }
 
 
+    @JmsListener(destination = "priceQueue")
+    public void receivePriceInfo(ObjectMessage objectMessage){
+        try {
+           List<PriceInfoDto> param  = (List<PriceInfoDto>) objectMessage.getObject();
+           this.savePriceBatch(param);
+        } catch (Exception ex){
+            log.error("接收消息错误", ex);
+        }
+    }
+
     public static Set<String> getUnion(Set<Set<String>> sets) {
     public static Set<String> getUnion(Set<Set<String>> sets) {
         Set<String> result = new HashSet<>();
         Set<String> result = new HashSet<>();
         for (Set<String> set : sets) {
         for (Set<String> set : sets) {

+ 8 - 0
price_collection/src/main/resources/application.yml

@@ -39,6 +39,14 @@ spring:
       # 配置监控统计拦截的filters,去掉后监控界面sql无法统计。‘wall’用于防火墙
       # 配置监控统计拦截的filters,去掉后监控界面sql无法统计。‘wall’用于防火墙
 #      filters: stat,wall
 #      filters: stat,wall
 
 
+  activemq:
+    broker-url: tcp://127.0.0.1:61616
+    packages:
+      # 配置信任所有包 为了支持发送对象消息
+      trust-all: true
+    user: admin
+    password: admin
+
 #  redis:
 #  redis:
 #    # Redis服务器地址
 #    # Redis服务器地址
 #    host: localhost
 #    host: localhost