From 5274830cc66be342661508f23e5ea0b5e3c06d3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=B4=AB=E5=BD=B1233?= <1440196015@qq.com> Date: Fri, 25 Jul 2025 14:32:21 +0800 Subject: [PATCH 1/2] =?UTF-8?q?v2.1.1=20=E6=9B=B4=E6=96=B0=EF=BC=9A=20=20?= =?UTF-8?q?=20=20=201=E3=80=81=E5=8D=87=E7=BA=A7metona-mq-mini-pro?= =?UTF-8?q?=E5=88=B02.0.0=EF=BC=8C=E9=87=8D=E6=9E=84=E5=AE=9E=E6=97=B6?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E8=8E=B7=E5=8F=96=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 4 +- .../java/cn/somkit/fmt/FmtApplication.java | 6 --- .../cn/somkit/fmt/action/LoggingAction.java | 23 --------- .../somkit/fmt/config/LogMonitorConfig.java | 50 +++++++++++++++++++ .../cn/somkit/fmt/filter/LogStashFilter.java | 16 +++--- .../cn/somkit/fmt/socket/SocketManage.java | 31 ++++++++++++ .../fmt/socket/WebSocketServerHandler.java | 49 ++---------------- .../java/cn/somkit/fmt/utils/LoggerQueue.java | 28 ----------- src/main/resources/templates/logging.html | 20 +------- 9 files changed, 97 insertions(+), 130 deletions(-) create mode 100644 src/main/java/cn/somkit/fmt/config/LogMonitorConfig.java create mode 100644 src/main/java/cn/somkit/fmt/socket/SocketManage.java delete mode 100644 src/main/java/cn/somkit/fmt/utils/LoggerQueue.java diff --git a/pom.xml b/pom.xml index 88f1555..a9699f1 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ cn.somkit fmt - 2.1.0 + 2.1.1 fmt File Manage System for by SpringBoot @@ -55,7 +55,7 @@ cn.metona metona-mq-mini-pro - 1.0.2 + 2.0.0 diff --git a/src/main/java/cn/somkit/fmt/FmtApplication.java b/src/main/java/cn/somkit/fmt/FmtApplication.java index 513204a..c3245db 100644 --- a/src/main/java/cn/somkit/fmt/FmtApplication.java +++ b/src/main/java/cn/somkit/fmt/FmtApplication.java @@ -1,6 +1,5 @@ package cn.somkit.fmt; -import cn.metona.mq.util.MetonaMQUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; @@ -13,11 +12,6 @@ public class FmtApplication { public static void main(String[] args) { SpringApplication.run(FmtApplication.class, args); - - if(!MetonaMQUtil.isInitialized()){ - logger.info("Metona MQ Mini Pro 初始化..."); - MetonaMQUtil.init(); - } } } diff --git a/src/main/java/cn/somkit/fmt/action/LoggingAction.java b/src/main/java/cn/somkit/fmt/action/LoggingAction.java index f1e2a6f..098eb5a 100644 --- a/src/main/java/cn/somkit/fmt/action/LoggingAction.java +++ b/src/main/java/cn/somkit/fmt/action/LoggingAction.java @@ -1,38 +1,15 @@ package cn.somkit.fmt.action; -import cn.metona.cache.Cache; -import cn.metona.mq.util.MetonaMQUtil; -import cn.somkit.fmt.socket.WebSocketServerHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.ResponseBody; @Controller @RequestMapping("/logging") public class LoggingAction { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - - @Autowired - private Cache cache; - @GetMapping("/index") public String index() throws Exception{ return "logging"; } - - @ResponseBody - @PostMapping("/close") - public void close(Boolean closed) throws Exception { - cache.put("closed", String.valueOf(closed)); - logger.info("Metona MQ Mini Pro 停止消费者(顺序消息)..."); - if(closed){ - MetonaMQUtil.stopOrderedConsuming(); - } - } } diff --git a/src/main/java/cn/somkit/fmt/config/LogMonitorConfig.java b/src/main/java/cn/somkit/fmt/config/LogMonitorConfig.java new file mode 100644 index 0000000..a514d7f --- /dev/null +++ b/src/main/java/cn/somkit/fmt/config/LogMonitorConfig.java @@ -0,0 +1,50 @@ +package cn.somkit.fmt.config; + +import cn.metona.mq.MQToolkit; +import cn.metona.mq.consumer.MessageConsumer; +import cn.metona.mq.utils.MonitorUtils; +import cn.somkit.fmt.socket.SocketManage; +import jakarta.annotation.PostConstruct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; + +import java.io.IOException; +import java.util.List; + +@Component +public class LogMonitorConfig { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private static final String Queue_Name = "log-monitor"; + + private static final String Consumer_Id = "log-monitor-consumer"; + + @PostConstruct + public void run(){ + //创建日志监控队列 + MQToolkit.createQueue(Queue_Name, Boolean.FALSE); + //创建日志监控消费者 + MessageConsumer consumer = MQToolkit.createConsumer(Consumer_Id, Queue_Name, message -> { + List list = SocketManage.all(); + if(list != null && !list.isEmpty()){ + list.forEach(session -> { + if(session.isOpen()){ + try { + session.sendMessage(new TextMessage(message.body())); + } catch (IOException e) { + logger.error("发送消息失败:", e); + } + } + }); + } + }); + //启动消费者 + consumer.start(); + // 显示队列统计 + MonitorUtils.printQueueStats(Queue_Name); + } +} diff --git a/src/main/java/cn/somkit/fmt/filter/LogStashFilter.java b/src/main/java/cn/somkit/fmt/filter/LogStashFilter.java index 87fe46d..1215ed8 100644 --- a/src/main/java/cn/somkit/fmt/filter/LogStashFilter.java +++ b/src/main/java/cn/somkit/fmt/filter/LogStashFilter.java @@ -6,8 +6,8 @@ import ch.qos.logback.core.filter.Filter; import ch.qos.logback.core.spi.FilterReply; import cn.hutool.core.date.DateUtil; import cn.hutool.json.JSONUtil; -import cn.metona.mq.exception.MessageSendException; -import cn.metona.mq.util.MetonaMQUtil; +import cn.metona.mq.MQToolkit; +import cn.metona.mq.core.MessageQueue; import cn.somkit.fmt.entity.LoggerMessage; import java.time.Instant; @@ -15,6 +15,8 @@ import java.time.ZoneId; public class LogStashFilter extends Filter { + private static final String Queue_Name = "log-monitor"; + Level level; public LogStashFilter() { @@ -29,12 +31,10 @@ public class LogStashFilter extends Filter { DateUtil.format(Instant.ofEpochMilli(e.getTimeStamp()).atZone(ZoneId.systemDefault()).toLocalDateTime(), "yyyy-MM-dd HH:mm:ss.SSS") ); - try { - if(MetonaMQUtil.isInitialized() && MetonaMQUtil.isOrderedConsumerRunning()){ - MetonaMQUtil.send("log-topic", "log-monitor", JSONUtil.toJsonStr(msg)); - } - } catch (MessageSendException ex) { - System.out.println("发送消息队列失败"); + //发送日志信息到日志监控队列 + MessageQueue.QueueStats queueStats = MQToolkit.getQueueStats(Queue_Name); + if(queueStats != null && queueStats.isRunning() && !queueStats.isPaused()){ + MQToolkit.sendMessage(Queue_Name, "log.monitor", JSONUtil.toJsonStr(msg)); } return FilterReply.NEUTRAL; } diff --git a/src/main/java/cn/somkit/fmt/socket/SocketManage.java b/src/main/java/cn/somkit/fmt/socket/SocketManage.java new file mode 100644 index 0000000..910eafe --- /dev/null +++ b/src/main/java/cn/somkit/fmt/socket/SocketManage.java @@ -0,0 +1,31 @@ +package cn.somkit.fmt.socket; + +import org.springframework.web.socket.WebSocketSession; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class SocketManage { + + private SocketManage() { + } + + private static final Map onlineMap = new ConcurrentHashMap<>(); + + public static void put(WebSocketSession session){ + onlineMap.put(session.getId(), session); + } + + public static WebSocketSession get(String sessionId){ + return onlineMap.get(sessionId); + } + + public static void remove(String sessionId){ + onlineMap.remove(sessionId); + } + + public static List all(){ + return onlineMap.values().stream().toList(); + } +} diff --git a/src/main/java/cn/somkit/fmt/socket/WebSocketServerHandler.java b/src/main/java/cn/somkit/fmt/socket/WebSocketServerHandler.java index 23446d4..5fd5a5d 100644 --- a/src/main/java/cn/somkit/fmt/socket/WebSocketServerHandler.java +++ b/src/main/java/cn/somkit/fmt/socket/WebSocketServerHandler.java @@ -1,70 +1,29 @@ package cn.somkit.fmt.socket; -import cn.hutool.core.util.StrUtil; -import cn.metona.cache.Cache; -import cn.metona.mq.consumer.MessageListener; -import cn.metona.mq.core.Message; -import cn.metona.mq.util.MetonaMQUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.socket.*; -import java.io.IOException; - @Component public class WebSocketServerHandler implements WebSocketHandler { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - - @Autowired - private Cache cache; - @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { - push(session); - } - - private void push(WebSocketSession session) throws IOException { - boolean closed = StrUtil.isNotBlank(cache.get("closed")) && Boolean.parseBoolean(cache.get("closed")); - if(!closed){ - try { - if(MetonaMQUtil.isInitialized() && !MetonaMQUtil.isOrderedConsumerRunning()){ - logger.info("Metona MQ Mini Pro 订阅主题(log-topic)(顺序消息)..."); - MetonaMQUtil.subscribeOrdered("log-topic", new MessageListener() { - @Override - public void onMessage(Message message) { - try { - session.sendMessage(new TextMessage(message.getBody())); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - }); - - logger.info("Metona MQ Mini Pro 启动消费者(顺序消息)..."); - MetonaMQUtil.startOrderedConsuming(); - } - } catch (Exception e) { - logger.error("Metona MQ Mini Pro 异常", e); - } - } + SocketManage.put(session); } @Override public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { - push(session); + } @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { - + SocketManage.remove(session.getId()); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { - + SocketManage.remove(session.getId()); } @Override diff --git a/src/main/java/cn/somkit/fmt/utils/LoggerQueue.java b/src/main/java/cn/somkit/fmt/utils/LoggerQueue.java deleted file mode 100644 index 07754d7..0000000 --- a/src/main/java/cn/somkit/fmt/utils/LoggerQueue.java +++ /dev/null @@ -1,28 +0,0 @@ -package cn.somkit.fmt.utils; - -import cn.somkit.fmt.entity.LoggerMessage; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -public final class LoggerQueue { - private static final LoggerQueue INSTANCE = new LoggerQueue(); - private final BlockingQueue queue = new LinkedBlockingQueue<>(100000); - - public static LoggerQueue getInstance() { - return INSTANCE; - } - - public void push(LoggerMessage msg) { - queue.offer(msg); // 非阻塞插入 - } - - public LoggerMessage poll() { - try { - return queue.take(); // 阻塞直到有数据 - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return null; - } - } -} diff --git a/src/main/resources/templates/logging.html b/src/main/resources/templates/logging.html index d3c0224..5f54a3f 100644 --- a/src/main/resources/templates/logging.html +++ b/src/main/resources/templates/logging.html @@ -14,18 +14,6 @@ let ws = null; - let logMonitor = async (closed = false) => { - const options = { - url: Fmt.ctx() + '/logging/close', - data: {closed: closed}, - method: 'post' - }; - await Fmt.axios(options).then(() => {}).catch((err) => console.error(err)); - if(ws){ - ws.send('发送日志'); - } - } - const logger = new LogMonitorAdaptive('#logContainer', { theme: 'dark', maxLines: 10000, @@ -43,8 +31,8 @@ showLevel: true, // 是否显示日志级别标签 wordWrap: true, // 日志内容是否自动换行(true=换行,false=横向滚动) //暂停/继续 回调函数 - onTogglePause: async (isPaused) => { - await logMonitor(isPaused); + onTogglePause: (isPaused) => { + }, onCreated: () => { console.log('日志容器已创建'); @@ -59,10 +47,6 @@ return false; } - ws.onopen = function () { - - } - ws.onmessage = function (event) { if(event.data){ let data = JSON.parse(event.data); From 71ca306336337b79b7f46029535cf27e58544b43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=B4=AB=E5=BD=B1233?= <1440196015@qq.com> Date: Fri, 25 Jul 2025 14:34:42 +0800 Subject: [PATCH 2/2] =?UTF-8?q?v2.1.1=20=E6=9B=B4=E6=96=B0=EF=BC=9A=20=20?= =?UTF-8?q?=20=20=201=E3=80=81=E5=8D=87=E7=BA=A7metona-mq-mini-pro?= =?UTF-8?q?=E5=88=B02.0.0=EF=BC=8C=E9=87=8D=E6=9E=84=E5=AE=9E=E6=97=B6?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E8=8E=B7=E5=8F=96=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 版本记录/readme.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/版本记录/readme.md b/版本记录/readme.md index 1fbf09e..55e8473 100644 --- a/版本记录/readme.md +++ b/版本记录/readme.md @@ -32,4 +32,8 @@ > v2.1.0 ``` 引入metona-mq-mini-pro消息队列,重构实时日志获取方式 +``` +> v2.1.1 +``` + 升级metona-mq-mini-pro到2.0.0,重构实时日志获取方式 ``` \ No newline at end of file