Prechádzať zdrojové kódy

统一请求返回体,增加消费者并行处理数

JiahengHe 1 rok pred
rodič
commit
0eca860ce6

+ 7 - 0
price_collection/src/main/java/com/liangjiang/price_collection/config/ActiveMQConfig.java

@@ -1,11 +1,18 @@
 package com.liangjiang.price_collection.config;
 
+import com.sun.jndi.ldap.pool.PooledConnectionFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.jms.core.JmsTemplate;
 
+import javax.jms.ConnectionFactory;
 import javax.jms.Queue;
 
+@Slf4j
 @Configuration
 public class ActiveMQConfig {
     @Bean(name = "priceQueue")

+ 17 - 16
price_collection/src/main/java/com/liangjiang/price_collection/controller/ApiController.java

@@ -2,6 +2,7 @@ package com.liangjiang.price_collection.controller;
 
 import com.liangjiang.price_collection.dto.PriceInfoDto;
 import com.liangjiang.price_collection.dto.PriceListDto;
+import com.liangjiang.price_collection.res.R;
 import com.liangjiang.price_collection.service.ITableService;
 import com.liangjiang.price_collection.service.impl.QueryService;
 import lombok.RequiredArgsConstructor;
@@ -13,7 +14,6 @@ import javax.jms.Queue;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Set;
 
 @Slf4j
 @RestController
@@ -22,52 +22,53 @@ import java.util.Set;
 public class ApiController {
     private final ITableService tableService;
     private final QueryService queryService;
-    private final JmsTemplate jmsTemplate;
+    private final JmsTemplate myJmsTemplate;
     private final Queue priceQueue;
 
     @PostMapping("/add")
     public void saveInfo(@RequestBody List<PriceInfoDto> priceInfoDtos){
         // 开启持久化
-        jmsTemplate.setDeliveryMode(2);
-        jmsTemplate.setExplicitQosEnabled(true);
-        jmsTemplate.setDeliveryPersistent(true);
+        myJmsTemplate.setDeliveryMode(2);
+        myJmsTemplate.setExplicitQosEnabled(true);
+        myJmsTemplate.setDeliveryPersistent(true);
         // 写入队列
-        jmsTemplate.convertAndSend(priceQueue, priceInfoDtos);
+        myJmsTemplate.convertAndSend(priceQueue, priceInfoDtos);
     }
 
     @GetMapping("/getExchange")
-    public Set<String> exchangeSelect(){
-       return tableService.getExchangeSelect();
+    public R exchangeSelect(){
+
+       return R.ok(tableService.getExchangeSelect());
     }
 
     @GetMapping("/getCoin")
-    public Set<String> coinSelect(@RequestParam("exchange") String exchange){
+    public R coinSelect(@RequestParam("exchange") String exchange){
         String[] params = exchange.split(",");
-        return tableService.getCoinSelect(params);
+        return R.ok(tableService.getCoinSelect(params));
     }
 
     @GetMapping("/getPrices")
-    public PriceListDto getPriceRest(@RequestParam("exchange") String exchange, @RequestParam("coin")String coin){
+    public R getPriceRest(@RequestParam("exchange") String exchange, @RequestParam("coin")String coin){
         Integer[] times = QueryService.getDaysAgoTime(System.currentTimeMillis(), 3L);
 //        Integer start = 1705916692;
 //        Integer end = 1705916714;
         try {
-            return queryService.performQueries(exchange.split(","), coin, times[1], times[0]);
+            return R.ok(queryService.performQueries(exchange.split(","), coin, times[1], times[0]));
 //            return queryService.performQueries(exchange.split(","), coin, start, end);
         } catch (Exception ex){
             log.error("查询出错", ex);
-            return PriceListDto.builder().time(new HashSet<>()).values(new LinkedList<>()).build();
+            return R.ok(PriceListDto.builder().time(new HashSet<>()).values(new LinkedList<>()).build());
         }
     }
 
     @GetMapping("/getAddPrices")
-    public PriceListDto getAddPrices(@RequestParam("exchange") String exchange, @RequestParam("coin")String coin,@RequestParam("startTime") Integer startTime){
+    public R getAddPrices(@RequestParam("exchange") String exchange, @RequestParam("coin")String coin,@RequestParam("startTime") Integer startTime){
         Integer endTime = Math.toIntExact(System.currentTimeMillis()/1000);
         try {
-            return queryService.performQueries(exchange.split(","), coin, startTime, endTime);
+            return R.ok(queryService.performQueries(exchange.split(","), coin, startTime, endTime));
         } catch (Exception ex){
             log.error("查询出错", ex);
-            return PriceListDto.builder().time(new HashSet<>()).values(new LinkedList<>()).build();
+            return R.ok(PriceListDto.builder().time(new HashSet<>()).values(new LinkedList<>()).build());
         }
     }
 }

+ 70 - 0
price_collection/src/main/java/com/liangjiang/price_collection/res/R.java

@@ -0,0 +1,70 @@
+package com.liangjiang.price_collection.res;
+
+
+import com.alibaba.fastjson.JSONObject;
+import lombok.Data;
+import lombok.experimental.Accessors;
+
+import java.io.Serializable;
+
+/**
+ * 统一返回数据
+ * 特殊编码 5001 响应客户端,但是不用被提示框弹出
+ */
+//public class R  extends HashMap {
+@Data
+@Accessors(chain = true)
+public class R implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private int code;
+    private String msg;
+    private Object data;
+
+
+    public R() {
+        this.code = 200;
+        this.msg = "success";
+    }
+
+    public R(int code, String msg) {
+        this.code = code;
+        this.msg = msg;
+    }
+
+    public R(Object data) {
+        this.code = 200;
+        this.msg = "success";
+        this.data = data;
+    }
+
+    public R(Object data, String msg) {
+        this.code = 200;
+        this.msg = msg;
+        this.data = data;
+    }
+
+    public static R error(String msg) {
+        return error(700, msg);
+    }
+
+    public static R error(int code, String msg) {
+        return new R(code, msg);
+    }
+    public static R ok(Object data, String msg) {
+        return new R(data, msg);
+    }
+
+    public static R ok(Object data) {
+        return new R(data);
+    }
+
+    public static R ok() {
+        return new R();
+    }
+
+    public String toJsonString() {
+        return JSONObject.toJSONString(this);
+    }
+
+}

+ 2 - 1
price_collection/src/main/java/com/liangjiang/price_collection/service/impl/QueryService.java

@@ -51,10 +51,11 @@ public class QueryService {
         List<Future<ExchangeCoinPriceArrDto>> resultFutures = new LinkedList<>();
         List<ExchangeCoinPriceArrDto> data = new LinkedList<>();
         // 获取已有时间的合集
-        Set<Integer> times = new HashSet<>();
+        Set<Integer> times = new TreeSet<>();
         results.forEach(dto -> {
             times.addAll(dto.getPrices().keySet());
         });
+
         for (ExchangeCoinPriceDto dto : results){
             DataAssembleTask task = new DataAssembleTask(dto, coin, times);
             resultFutures.add(executorService.submit(task));

+ 2 - 1
price_collection/src/main/java/com/liangjiang/price_collection/service/impl/TableServiceImpl.java

@@ -148,7 +148,8 @@ public class TableServiceImpl implements ITableService {
         return this.tableMapper.getPriceInfo(tableName, startTime, endTime);
     }
 
-    @JmsListener(destination = "priceQueue")
+    // 3-10: 最少有3个并发消费者,最多可以扩展到10个并发消费者。消息处理的顺序性无法保证(此处业务不涉及强顺序)
+    @JmsListener(destination = "priceQueue", concurrency = "3-10")
     public void receivePriceInfo(ObjectMessage objectMessage){
         try {
            List<PriceInfoDto> param  = (List<PriceInfoDto>) objectMessage.getObject();

+ 1 - 1
price_collection/src/main/resources/mapper/TableMapper.xml

@@ -47,6 +47,6 @@
     </insert>
 
     <select id="getPriceInfo" resultType="com.liangjiang.price_collection.dto.PriceDto">
-        select id as `time`, bid, ask from ${tableName} where id between ${startTime} and ${endTime}
+        select id as `time`, bid, ask from ${tableName} where id &gt; ${startTime} and id &lt; ${endTime} order by id asc
     </select>
 </mapper>