Merge remote-tracking branch 'origin/develop'
This commit is contained in:
4
pom.xml
4
pom.xml
@@ -10,7 +10,7 @@
|
|||||||
</parent>
|
</parent>
|
||||||
<groupId>cn.somkit</groupId>
|
<groupId>cn.somkit</groupId>
|
||||||
<artifactId>fmt</artifactId>
|
<artifactId>fmt</artifactId>
|
||||||
<version>2.1.0</version>
|
<version>2.1.1</version>
|
||||||
<name>fmt</name>
|
<name>fmt</name>
|
||||||
<description>File Manage System for by SpringBoot</description>
|
<description>File Manage System for by SpringBoot</description>
|
||||||
<properties>
|
<properties>
|
||||||
@@ -55,7 +55,7 @@
|
|||||||
<dependency>
|
<dependency>
|
||||||
<groupId>cn.metona</groupId>
|
<groupId>cn.metona</groupId>
|
||||||
<artifactId>metona-mq-mini-pro</artifactId>
|
<artifactId>metona-mq-mini-pro</artifactId>
|
||||||
<version>1.0.2</version>
|
<version>2.0.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
package cn.somkit.fmt;
|
package cn.somkit.fmt;
|
||||||
|
|
||||||
import cn.metona.mq.util.MetonaMQUtil;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.boot.SpringApplication;
|
import org.springframework.boot.SpringApplication;
|
||||||
@@ -13,11 +12,6 @@ public class FmtApplication {
|
|||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
SpringApplication.run(FmtApplication.class, args);
|
SpringApplication.run(FmtApplication.class, args);
|
||||||
|
|
||||||
if(!MetonaMQUtil.isInitialized()){
|
|
||||||
logger.info("Metona MQ Mini Pro 初始化...");
|
|
||||||
MetonaMQUtil.init();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,38 +1,15 @@
|
|||||||
package cn.somkit.fmt.action;
|
package cn.somkit.fmt.action;
|
||||||
|
|
||||||
import cn.metona.cache.Cache;
|
|
||||||
import cn.metona.mq.util.MetonaMQUtil;
|
|
||||||
import cn.somkit.fmt.socket.WebSocketServerHandler;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.stereotype.Controller;
|
import org.springframework.stereotype.Controller;
|
||||||
import org.springframework.web.bind.annotation.GetMapping;
|
import org.springframework.web.bind.annotation.GetMapping;
|
||||||
import org.springframework.web.bind.annotation.PostMapping;
|
|
||||||
import org.springframework.web.bind.annotation.RequestMapping;
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
import org.springframework.web.bind.annotation.ResponseBody;
|
|
||||||
|
|
||||||
@Controller
|
@Controller
|
||||||
@RequestMapping("/logging")
|
@RequestMapping("/logging")
|
||||||
public class LoggingAction {
|
public class LoggingAction {
|
||||||
|
|
||||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private Cache<String, String> cache;
|
|
||||||
|
|
||||||
@GetMapping("/index")
|
@GetMapping("/index")
|
||||||
public String index() throws Exception{
|
public String index() throws Exception{
|
||||||
return "logging";
|
return "logging";
|
||||||
}
|
}
|
||||||
|
|
||||||
@ResponseBody
|
|
||||||
@PostMapping("/close")
|
|
||||||
public void close(Boolean closed) throws Exception {
|
|
||||||
cache.put("closed", String.valueOf(closed));
|
|
||||||
logger.info("Metona MQ Mini Pro 停止消费者(顺序消息)...");
|
|
||||||
if(closed){
|
|
||||||
MetonaMQUtil.stopOrderedConsuming();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
50
src/main/java/cn/somkit/fmt/config/LogMonitorConfig.java
Normal file
50
src/main/java/cn/somkit/fmt/config/LogMonitorConfig.java
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
package cn.somkit.fmt.config;
|
||||||
|
|
||||||
|
import cn.metona.mq.MQToolkit;
|
||||||
|
import cn.metona.mq.consumer.MessageConsumer;
|
||||||
|
import cn.metona.mq.utils.MonitorUtils;
|
||||||
|
import cn.somkit.fmt.socket.SocketManage;
|
||||||
|
import jakarta.annotation.PostConstruct;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.web.socket.TextMessage;
|
||||||
|
import org.springframework.web.socket.WebSocketSession;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class LogMonitorConfig {
|
||||||
|
|
||||||
|
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||||
|
|
||||||
|
private static final String Queue_Name = "log-monitor";
|
||||||
|
|
||||||
|
private static final String Consumer_Id = "log-monitor-consumer";
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void run(){
|
||||||
|
//创建日志监控队列
|
||||||
|
MQToolkit.createQueue(Queue_Name, Boolean.FALSE);
|
||||||
|
//创建日志监控消费者
|
||||||
|
MessageConsumer consumer = MQToolkit.createConsumer(Consumer_Id, Queue_Name, message -> {
|
||||||
|
List<WebSocketSession> list = SocketManage.all();
|
||||||
|
if(list != null && !list.isEmpty()){
|
||||||
|
list.forEach(session -> {
|
||||||
|
if(session.isOpen()){
|
||||||
|
try {
|
||||||
|
session.sendMessage(new TextMessage(message.body()));
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.error("发送消息失败:", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
//启动消费者
|
||||||
|
consumer.start();
|
||||||
|
// 显示队列统计
|
||||||
|
MonitorUtils.printQueueStats(Queue_Name);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -6,8 +6,8 @@ import ch.qos.logback.core.filter.Filter;
|
|||||||
import ch.qos.logback.core.spi.FilterReply;
|
import ch.qos.logback.core.spi.FilterReply;
|
||||||
import cn.hutool.core.date.DateUtil;
|
import cn.hutool.core.date.DateUtil;
|
||||||
import cn.hutool.json.JSONUtil;
|
import cn.hutool.json.JSONUtil;
|
||||||
import cn.metona.mq.exception.MessageSendException;
|
import cn.metona.mq.MQToolkit;
|
||||||
import cn.metona.mq.util.MetonaMQUtil;
|
import cn.metona.mq.core.MessageQueue;
|
||||||
import cn.somkit.fmt.entity.LoggerMessage;
|
import cn.somkit.fmt.entity.LoggerMessage;
|
||||||
|
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
@@ -15,6 +15,8 @@ import java.time.ZoneId;
|
|||||||
|
|
||||||
public class LogStashFilter extends Filter<ILoggingEvent> {
|
public class LogStashFilter extends Filter<ILoggingEvent> {
|
||||||
|
|
||||||
|
private static final String Queue_Name = "log-monitor";
|
||||||
|
|
||||||
Level level;
|
Level level;
|
||||||
|
|
||||||
public LogStashFilter() {
|
public LogStashFilter() {
|
||||||
@@ -29,12 +31,10 @@ public class LogStashFilter extends Filter<ILoggingEvent> {
|
|||||||
DateUtil.format(Instant.ofEpochMilli(e.getTimeStamp()).atZone(ZoneId.systemDefault()).toLocalDateTime(),
|
DateUtil.format(Instant.ofEpochMilli(e.getTimeStamp()).atZone(ZoneId.systemDefault()).toLocalDateTime(),
|
||||||
"yyyy-MM-dd HH:mm:ss.SSS")
|
"yyyy-MM-dd HH:mm:ss.SSS")
|
||||||
);
|
);
|
||||||
try {
|
//发送日志信息到日志监控队列
|
||||||
if(MetonaMQUtil.isInitialized() && MetonaMQUtil.isOrderedConsumerRunning()){
|
MessageQueue.QueueStats queueStats = MQToolkit.getQueueStats(Queue_Name);
|
||||||
MetonaMQUtil.send("log-topic", "log-monitor", JSONUtil.toJsonStr(msg));
|
if(queueStats != null && queueStats.isRunning() && !queueStats.isPaused()){
|
||||||
}
|
MQToolkit.sendMessage(Queue_Name, "log.monitor", JSONUtil.toJsonStr(msg));
|
||||||
} catch (MessageSendException ex) {
|
|
||||||
System.out.println("发送消息队列失败");
|
|
||||||
}
|
}
|
||||||
return FilterReply.NEUTRAL;
|
return FilterReply.NEUTRAL;
|
||||||
}
|
}
|
||||||
|
|||||||
31
src/main/java/cn/somkit/fmt/socket/SocketManage.java
Normal file
31
src/main/java/cn/somkit/fmt/socket/SocketManage.java
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
package cn.somkit.fmt.socket;
|
||||||
|
|
||||||
|
import org.springframework.web.socket.WebSocketSession;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
public class SocketManage {
|
||||||
|
|
||||||
|
private SocketManage() {
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final Map<String, WebSocketSession> onlineMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
public static void put(WebSocketSession session){
|
||||||
|
onlineMap.put(session.getId(), session);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static WebSocketSession get(String sessionId){
|
||||||
|
return onlineMap.get(sessionId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void remove(String sessionId){
|
||||||
|
onlineMap.remove(sessionId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static List<WebSocketSession> all(){
|
||||||
|
return onlineMap.values().stream().toList();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,70 +1,29 @@
|
|||||||
package cn.somkit.fmt.socket;
|
package cn.somkit.fmt.socket;
|
||||||
|
|
||||||
import cn.hutool.core.util.StrUtil;
|
|
||||||
import cn.metona.cache.Cache;
|
|
||||||
import cn.metona.mq.consumer.MessageListener;
|
|
||||||
import cn.metona.mq.core.Message;
|
|
||||||
import cn.metona.mq.util.MetonaMQUtil;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.web.socket.*;
|
import org.springframework.web.socket.*;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
public class WebSocketServerHandler implements WebSocketHandler {
|
public class WebSocketServerHandler implements WebSocketHandler {
|
||||||
|
|
||||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private Cache<String, String> cache;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
|
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
|
||||||
push(session);
|
SocketManage.put(session);
|
||||||
}
|
|
||||||
|
|
||||||
private void push(WebSocketSession session) throws IOException {
|
|
||||||
boolean closed = StrUtil.isNotBlank(cache.get("closed")) && Boolean.parseBoolean(cache.get("closed"));
|
|
||||||
if(!closed){
|
|
||||||
try {
|
|
||||||
if(MetonaMQUtil.isInitialized() && !MetonaMQUtil.isOrderedConsumerRunning()){
|
|
||||||
logger.info("Metona MQ Mini Pro 订阅主题(log-topic)(顺序消息)...");
|
|
||||||
MetonaMQUtil.subscribeOrdered("log-topic", new MessageListener() {
|
|
||||||
@Override
|
|
||||||
public void onMessage(Message message) {
|
|
||||||
try {
|
|
||||||
session.sendMessage(new TextMessage(message.getBody()));
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
logger.info("Metona MQ Mini Pro 启动消费者(顺序消息)...");
|
|
||||||
MetonaMQUtil.startOrderedConsuming();
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("Metona MQ Mini Pro 异常", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
|
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
|
||||||
push(session);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
|
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
|
||||||
|
SocketManage.remove(session.getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
|
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
|
||||||
|
SocketManage.remove(session.getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -1,28 +0,0 @@
|
|||||||
package cn.somkit.fmt.utils;
|
|
||||||
|
|
||||||
import cn.somkit.fmt.entity.LoggerMessage;
|
|
||||||
|
|
||||||
import java.util.concurrent.BlockingQueue;
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
|
|
||||||
public final class LoggerQueue {
|
|
||||||
private static final LoggerQueue INSTANCE = new LoggerQueue();
|
|
||||||
private final BlockingQueue<LoggerMessage> queue = new LinkedBlockingQueue<>(100000);
|
|
||||||
|
|
||||||
public static LoggerQueue getInstance() {
|
|
||||||
return INSTANCE;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void push(LoggerMessage msg) {
|
|
||||||
queue.offer(msg); // 非阻塞插入
|
|
||||||
}
|
|
||||||
|
|
||||||
public LoggerMessage poll() {
|
|
||||||
try {
|
|
||||||
return queue.take(); // 阻塞直到有数据
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -14,18 +14,6 @@
|
|||||||
|
|
||||||
let ws = null;
|
let ws = null;
|
||||||
|
|
||||||
let logMonitor = async (closed = false) => {
|
|
||||||
const options = {
|
|
||||||
url: Fmt.ctx() + '/logging/close',
|
|
||||||
data: {closed: closed},
|
|
||||||
method: 'post'
|
|
||||||
};
|
|
||||||
await Fmt.axios(options).then(() => {}).catch((err) => console.error(err));
|
|
||||||
if(ws){
|
|
||||||
ws.send('发送日志');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const logger = new LogMonitorAdaptive('#logContainer', {
|
const logger = new LogMonitorAdaptive('#logContainer', {
|
||||||
theme: 'dark',
|
theme: 'dark',
|
||||||
maxLines: 10000,
|
maxLines: 10000,
|
||||||
@@ -43,8 +31,8 @@
|
|||||||
showLevel: true, // 是否显示日志级别标签
|
showLevel: true, // 是否显示日志级别标签
|
||||||
wordWrap: true, // 日志内容是否自动换行(true=换行,false=横向滚动)
|
wordWrap: true, // 日志内容是否自动换行(true=换行,false=横向滚动)
|
||||||
//暂停/继续 回调函数
|
//暂停/继续 回调函数
|
||||||
onTogglePause: async (isPaused) => {
|
onTogglePause: (isPaused) => {
|
||||||
await logMonitor(isPaused);
|
|
||||||
},
|
},
|
||||||
onCreated: () => {
|
onCreated: () => {
|
||||||
console.log('日志容器已创建');
|
console.log('日志容器已创建');
|
||||||
@@ -59,10 +47,6 @@
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
ws.onopen = function () {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
ws.onmessage = function (event) {
|
ws.onmessage = function (event) {
|
||||||
if(event.data){
|
if(event.data){
|
||||||
let data = JSON.parse(event.data);
|
let data = JSON.parse(event.data);
|
||||||
|
|||||||
@@ -32,4 +32,8 @@
|
|||||||
> v2.1.0
|
> v2.1.0
|
||||||
```
|
```
|
||||||
引入metona-mq-mini-pro消息队列,重构实时日志获取方式
|
引入metona-mq-mini-pro消息队列,重构实时日志获取方式
|
||||||
|
```
|
||||||
|
> v2.1.1
|
||||||
|
```
|
||||||
|
升级metona-mq-mini-pro到2.0.0,重构实时日志获取方式
|
||||||
```
|
```
|
||||||
Reference in New Issue
Block a user