v2.1.1 更新:

1、升级metona-mq-mini-pro到2.0.0,重构实时日志获取方式
This commit is contained in:
2025-07-25 14:32:21 +08:00
parent b37da657fa
commit 5274830cc6
9 changed files with 97 additions and 130 deletions

View File

@@ -10,7 +10,7 @@
</parent> </parent>
<groupId>cn.somkit</groupId> <groupId>cn.somkit</groupId>
<artifactId>fmt</artifactId> <artifactId>fmt</artifactId>
<version>2.1.0</version> <version>2.1.1</version>
<name>fmt</name> <name>fmt</name>
<description>File Manage System for by SpringBoot</description> <description>File Manage System for by SpringBoot</description>
<properties> <properties>
@@ -55,7 +55,7 @@
<dependency> <dependency>
<groupId>cn.metona</groupId> <groupId>cn.metona</groupId>
<artifactId>metona-mq-mini-pro</artifactId> <artifactId>metona-mq-mini-pro</artifactId>
<version>1.0.2</version> <version>2.0.0</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@@ -1,6 +1,5 @@
package cn.somkit.fmt; package cn.somkit.fmt;
import cn.metona.mq.util.MetonaMQUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
@@ -13,11 +12,6 @@ public class FmtApplication {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(FmtApplication.class, args); SpringApplication.run(FmtApplication.class, args);
if(!MetonaMQUtil.isInitialized()){
logger.info("Metona MQ Mini Pro 初始化...");
MetonaMQUtil.init();
}
} }
} }

View File

@@ -1,38 +1,15 @@
package cn.somkit.fmt.action; package cn.somkit.fmt.action;
import cn.metona.cache.Cache;
import cn.metona.mq.util.MetonaMQUtil;
import cn.somkit.fmt.socket.WebSocketServerHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller; import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller @Controller
@RequestMapping("/logging") @RequestMapping("/logging")
public class LoggingAction { public class LoggingAction {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private Cache<String, String> cache;
@GetMapping("/index") @GetMapping("/index")
public String index() throws Exception{ public String index() throws Exception{
return "logging"; return "logging";
} }
@ResponseBody
@PostMapping("/close")
public void close(Boolean closed) throws Exception {
cache.put("closed", String.valueOf(closed));
logger.info("Metona MQ Mini Pro 停止消费者(顺序消息)...");
if(closed){
MetonaMQUtil.stopOrderedConsuming();
}
}
} }

View File

@@ -0,0 +1,50 @@
package cn.somkit.fmt.config;
import cn.metona.mq.MQToolkit;
import cn.metona.mq.consumer.MessageConsumer;
import cn.metona.mq.utils.MonitorUtils;
import cn.somkit.fmt.socket.SocketManage;
import jakarta.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.List;
@Component
public class LogMonitorConfig {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private static final String Queue_Name = "log-monitor";
private static final String Consumer_Id = "log-monitor-consumer";
@PostConstruct
public void run(){
//创建日志监控队列
MQToolkit.createQueue(Queue_Name, Boolean.FALSE);
//创建日志监控消费者
MessageConsumer consumer = MQToolkit.createConsumer(Consumer_Id, Queue_Name, message -> {
List<WebSocketSession> list = SocketManage.all();
if(list != null && !list.isEmpty()){
list.forEach(session -> {
if(session.isOpen()){
try {
session.sendMessage(new TextMessage(message.body()));
} catch (IOException e) {
logger.error("发送消息失败:", e);
}
}
});
}
});
//启动消费者
consumer.start();
// 显示队列统计
MonitorUtils.printQueueStats(Queue_Name);
}
}

View File

@@ -6,8 +6,8 @@ import ch.qos.logback.core.filter.Filter;
import ch.qos.logback.core.spi.FilterReply; import ch.qos.logback.core.spi.FilterReply;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import cn.metona.mq.exception.MessageSendException; import cn.metona.mq.MQToolkit;
import cn.metona.mq.util.MetonaMQUtil; import cn.metona.mq.core.MessageQueue;
import cn.somkit.fmt.entity.LoggerMessage; import cn.somkit.fmt.entity.LoggerMessage;
import java.time.Instant; import java.time.Instant;
@@ -15,6 +15,8 @@ import java.time.ZoneId;
public class LogStashFilter extends Filter<ILoggingEvent> { public class LogStashFilter extends Filter<ILoggingEvent> {
private static final String Queue_Name = "log-monitor";
Level level; Level level;
public LogStashFilter() { public LogStashFilter() {
@@ -29,12 +31,10 @@ public class LogStashFilter extends Filter<ILoggingEvent> {
DateUtil.format(Instant.ofEpochMilli(e.getTimeStamp()).atZone(ZoneId.systemDefault()).toLocalDateTime(), DateUtil.format(Instant.ofEpochMilli(e.getTimeStamp()).atZone(ZoneId.systemDefault()).toLocalDateTime(),
"yyyy-MM-dd HH:mm:ss.SSS") "yyyy-MM-dd HH:mm:ss.SSS")
); );
try { //发送日志信息到日志监控队列
if(MetonaMQUtil.isInitialized() && MetonaMQUtil.isOrderedConsumerRunning()){ MessageQueue.QueueStats queueStats = MQToolkit.getQueueStats(Queue_Name);
MetonaMQUtil.send("log-topic", "log-monitor", JSONUtil.toJsonStr(msg)); if(queueStats != null && queueStats.isRunning() && !queueStats.isPaused()){
} MQToolkit.sendMessage(Queue_Name, "log.monitor", JSONUtil.toJsonStr(msg));
} catch (MessageSendException ex) {
System.out.println("发送消息队列失败");
} }
return FilterReply.NEUTRAL; return FilterReply.NEUTRAL;
} }

View File

@@ -0,0 +1,31 @@
package cn.somkit.fmt.socket;
import org.springframework.web.socket.WebSocketSession;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class SocketManage {
private SocketManage() {
}
private static final Map<String, WebSocketSession> onlineMap = new ConcurrentHashMap<>();
public static void put(WebSocketSession session){
onlineMap.put(session.getId(), session);
}
public static WebSocketSession get(String sessionId){
return onlineMap.get(sessionId);
}
public static void remove(String sessionId){
onlineMap.remove(sessionId);
}
public static List<WebSocketSession> all(){
return onlineMap.values().stream().toList();
}
}

View File

@@ -1,70 +1,29 @@
package cn.somkit.fmt.socket; package cn.somkit.fmt.socket;
import cn.hutool.core.util.StrUtil;
import cn.metona.cache.Cache;
import cn.metona.mq.consumer.MessageListener;
import cn.metona.mq.core.Message;
import cn.metona.mq.util.MetonaMQUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.socket.*; import org.springframework.web.socket.*;
import java.io.IOException;
@Component @Component
public class WebSocketServerHandler implements WebSocketHandler { public class WebSocketServerHandler implements WebSocketHandler {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private Cache<String, String> cache;
@Override @Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception { public void afterConnectionEstablished(WebSocketSession session) throws Exception {
push(session); SocketManage.put(session);
}
private void push(WebSocketSession session) throws IOException {
boolean closed = StrUtil.isNotBlank(cache.get("closed")) && Boolean.parseBoolean(cache.get("closed"));
if(!closed){
try {
if(MetonaMQUtil.isInitialized() && !MetonaMQUtil.isOrderedConsumerRunning()){
logger.info("Metona MQ Mini Pro 订阅主题log-topic顺序消息...");
MetonaMQUtil.subscribeOrdered("log-topic", new MessageListener() {
@Override
public void onMessage(Message message) {
try {
session.sendMessage(new TextMessage(message.getBody()));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
logger.info("Metona MQ Mini Pro 启动消费者(顺序消息)...");
MetonaMQUtil.startOrderedConsuming();
}
} catch (Exception e) {
logger.error("Metona MQ Mini Pro 异常", e);
}
}
} }
@Override @Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
push(session);
} }
@Override @Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
SocketManage.remove(session.getId());
} }
@Override @Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
SocketManage.remove(session.getId());
} }
@Override @Override

View File

@@ -1,28 +0,0 @@
package cn.somkit.fmt.utils;
import cn.somkit.fmt.entity.LoggerMessage;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public final class LoggerQueue {
private static final LoggerQueue INSTANCE = new LoggerQueue();
private final BlockingQueue<LoggerMessage> queue = new LinkedBlockingQueue<>(100000);
public static LoggerQueue getInstance() {
return INSTANCE;
}
public void push(LoggerMessage msg) {
queue.offer(msg); // 非阻塞插入
}
public LoggerMessage poll() {
try {
return queue.take(); // 阻塞直到有数据
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
}

View File

@@ -14,18 +14,6 @@
let ws = null; let ws = null;
let logMonitor = async (closed = false) => {
const options = {
url: Fmt.ctx() + '/logging/close',
data: {closed: closed},
method: 'post'
};
await Fmt.axios(options).then(() => {}).catch((err) => console.error(err));
if(ws){
ws.send('发送日志');
}
}
const logger = new LogMonitorAdaptive('#logContainer', { const logger = new LogMonitorAdaptive('#logContainer', {
theme: 'dark', theme: 'dark',
maxLines: 10000, maxLines: 10000,
@@ -43,8 +31,8 @@
showLevel: true, // 是否显示日志级别标签 showLevel: true, // 是否显示日志级别标签
wordWrap: true, // 日志内容是否自动换行true=换行false=横向滚动) wordWrap: true, // 日志内容是否自动换行true=换行false=横向滚动)
//暂停/继续 回调函数 //暂停/继续 回调函数
onTogglePause: async (isPaused) => { onTogglePause: (isPaused) => {
await logMonitor(isPaused);
}, },
onCreated: () => { onCreated: () => {
console.log('日志容器已创建'); console.log('日志容器已创建');
@@ -59,10 +47,6 @@
return false; return false;
} }
ws.onopen = function () {
}
ws.onmessage = function (event) { ws.onmessage = function (event) {
if(event.data){ if(event.data){
let data = JSON.parse(event.data); let data = JSON.parse(event.data);