From bb99e48275ee19484793df39e0a659f7bd5678bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=B4=AB=E5=BD=B1233?= <1440196015@qq.com> Date: Wed, 23 Jul 2025 18:27:16 +0800 Subject: [PATCH] =?UTF-8?q?v2.1.0=20=E6=9B=B4=E6=96=B0=EF=BC=9A=20=20=20?= =?UTF-8?q?=20=201=E3=80=81=E5=BC=95=E5=85=A5metona-mq-mini-pro=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E9=98=9F=E5=88=97=EF=BC=8C=E9=87=8D=E6=9E=84=E5=AE=9E?= =?UTF-8?q?=E6=97=B6=E6=97=A5=E5=BF=97=E8=8E=B7=E5=8F=96=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 8 ++++- .../java/cn/somkit/fmt/FmtApplication.java | 13 +++++++ .../cn/somkit/fmt/action/LoggingAction.java | 14 ++++++-- .../cn/somkit/fmt/filter/LogStashFilter.java | 13 +++++-- .../fmt/socket/WebSocketServerHandler.java | 35 ++++++++++++++----- 5 files changed, 70 insertions(+), 13 deletions(-) diff --git a/pom.xml b/pom.xml index 2f8e538..94627f0 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ cn.somkit fmt - 2.0.1 + 2.1.0 fmt File Manage System for by SpringBoot @@ -51,6 +51,12 @@ metona-cache-spring-boot-starter 1.0.0 + + + cn.metona + metona-mq-mini-pro + 1.0.1 + diff --git a/src/main/java/cn/somkit/fmt/FmtApplication.java b/src/main/java/cn/somkit/fmt/FmtApplication.java index bb94f44..dd84694 100644 --- a/src/main/java/cn/somkit/fmt/FmtApplication.java +++ b/src/main/java/cn/somkit/fmt/FmtApplication.java @@ -1,13 +1,26 @@ package cn.somkit.fmt; +import cn.metona.mq.consumer.MessageListener; +import cn.metona.mq.core.Message; +import cn.metona.mq.exception.MessageConsumeException; +import cn.metona.mq.util.MetonaMQUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class FmtApplication { + private static final Logger logger = LoggerFactory.getLogger(FmtApplication.class); + public static void main(String[] args) { SpringApplication.run(FmtApplication.class, args); + + if(!MetonaMQUtil.isInitialized()){ + logger.info("Metona MQ Mini Pro 初始化..."); + MetonaMQUtil.init(); + } } } diff --git a/src/main/java/cn/somkit/fmt/action/LoggingAction.java b/src/main/java/cn/somkit/fmt/action/LoggingAction.java index 68d173f..f1e2a6f 100644 --- a/src/main/java/cn/somkit/fmt/action/LoggingAction.java +++ b/src/main/java/cn/somkit/fmt/action/LoggingAction.java @@ -1,6 +1,10 @@ package cn.somkit.fmt.action; import cn.metona.cache.Cache; +import cn.metona.mq.util.MetonaMQUtil; +import cn.somkit.fmt.socket.WebSocketServerHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; @@ -12,8 +16,10 @@ import org.springframework.web.bind.annotation.ResponseBody; @RequestMapping("/logging") public class LoggingAction { + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + @Autowired - private Cache cache; + private Cache cache; @GetMapping("/index") public String index() throws Exception{ @@ -23,6 +29,10 @@ public class LoggingAction { @ResponseBody @PostMapping("/close") public void close(Boolean closed) throws Exception { - cache.put("closed", closed); + cache.put("closed", String.valueOf(closed)); + logger.info("Metona MQ Mini Pro 停止消费者(顺序消息)..."); + if(closed){ + MetonaMQUtil.stopOrderedConsuming(); + } } } diff --git a/src/main/java/cn/somkit/fmt/filter/LogStashFilter.java b/src/main/java/cn/somkit/fmt/filter/LogStashFilter.java index 2c28cac..3ae8c2a 100644 --- a/src/main/java/cn/somkit/fmt/filter/LogStashFilter.java +++ b/src/main/java/cn/somkit/fmt/filter/LogStashFilter.java @@ -5,8 +5,10 @@ import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.filter.Filter; import ch.qos.logback.core.spi.FilterReply; import cn.hutool.core.date.DateUtil; +import cn.hutool.json.JSONUtil; +import cn.metona.mq.exception.MessageSendException; +import cn.metona.mq.util.MetonaMQUtil; import cn.somkit.fmt.entity.LoggerMessage; -import cn.somkit.fmt.utils.LoggerQueue; import java.time.Instant; import java.time.ZoneId; @@ -27,7 +29,14 @@ public class LogStashFilter extends Filter { DateUtil.format(Instant.ofEpochMilli(e.getTimeStamp()).atZone(ZoneId.systemDefault()).toLocalDateTime(), "yyyy-MM-dd HH:mm:ss.SSS") ); - LoggerQueue.getInstance().push(msg); // 单例阻塞队列 + try { + System.out.println(MetonaMQUtil.getConsumerStatus()); + if(MetonaMQUtil.isInitialized() && MetonaMQUtil.isOrderedConsumerRunning()){ + MetonaMQUtil.send("log-topic", "log-monitor", JSONUtil.toJsonStr(msg)); + } + } catch (MessageSendException ex) { + System.out.println("发送消息队列失败"); + } return FilterReply.NEUTRAL; } diff --git a/src/main/java/cn/somkit/fmt/socket/WebSocketServerHandler.java b/src/main/java/cn/somkit/fmt/socket/WebSocketServerHandler.java index 15d399c..382fd41 100644 --- a/src/main/java/cn/somkit/fmt/socket/WebSocketServerHandler.java +++ b/src/main/java/cn/somkit/fmt/socket/WebSocketServerHandler.java @@ -1,10 +1,10 @@ package cn.somkit.fmt.socket; import cn.hutool.core.util.StrUtil; -import cn.hutool.json.JSONUtil; import cn.metona.cache.Cache; -import cn.somkit.fmt.entity.LoggerMessage; -import cn.somkit.fmt.utils.LoggerQueue; +import cn.metona.mq.consumer.MessageListener; +import cn.metona.mq.core.Message; +import cn.metona.mq.util.MetonaMQUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -19,7 +19,7 @@ public class WebSocketServerHandler implements WebSocketHandler { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired - private Cache cache; + private Cache cache; @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { @@ -27,10 +27,29 @@ public class WebSocketServerHandler implements WebSocketHandler { } private void push(WebSocketSession session) throws IOException { - while (StrUtil.isBlankIfStr(cache.get("closed")) || !Boolean.parseBoolean(String.valueOf(cache.get("closed")))) { - LoggerMessage log = LoggerQueue.getInstance().poll(); - if(log != null){ - session.sendMessage(new TextMessage(JSONUtil.toJsonStr(log))); + boolean closed = StrUtil.isNotBlank(cache.get("closed")) && Boolean.parseBoolean(cache.get("closed")); + if(!closed){ + try { + if(MetonaMQUtil.isInitialized()){ + logger.info("Metona MQ Mini Pro 订阅主题(log-topic)(顺序消息)..."); + MetonaMQUtil.subscribeOrdered("log-topic", new MessageListener() { + @Override + public void onMessage(Message message) { + try { + session.sendMessage(new TextMessage(message.getBody())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + + if(!MetonaMQUtil.isOrderedConsumerRunning()){ + logger.info("Metona MQ Mini Pro 启动消费者(顺序消息)..."); + MetonaMQUtil.startOrderedConsuming(); + } + } + } catch (Exception e) { + logger.error("Metona MQ Mini Pro 异常", e); } } }