Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 40f07bdd6e | |||
| 71ca306336 | |||
| 5274830cc6 |
4
pom.xml
4
pom.xml
@@ -10,7 +10,7 @@
|
||||
</parent>
|
||||
<groupId>cn.somkit</groupId>
|
||||
<artifactId>fmt</artifactId>
|
||||
<version>2.1.0</version>
|
||||
<version>2.1.1</version>
|
||||
<name>fmt</name>
|
||||
<description>File Manage System for by SpringBoot</description>
|
||||
<properties>
|
||||
@@ -55,7 +55,7 @@
|
||||
<dependency>
|
||||
<groupId>cn.metona</groupId>
|
||||
<artifactId>metona-mq-mini-pro</artifactId>
|
||||
<version>1.0.2</version>
|
||||
<version>2.0.0</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package cn.somkit.fmt;
|
||||
|
||||
import cn.metona.mq.util.MetonaMQUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
@@ -13,11 +12,6 @@ public class FmtApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(FmtApplication.class, args);
|
||||
|
||||
if(!MetonaMQUtil.isInitialized()){
|
||||
logger.info("Metona MQ Mini Pro 初始化...");
|
||||
MetonaMQUtil.init();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,38 +1,15 @@
|
||||
package cn.somkit.fmt.action;
|
||||
|
||||
import cn.metona.cache.Cache;
|
||||
import cn.metona.mq.util.MetonaMQUtil;
|
||||
import cn.somkit.fmt.socket.WebSocketServerHandler;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Controller;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.ResponseBody;
|
||||
|
||||
@Controller
|
||||
@RequestMapping("/logging")
|
||||
public class LoggingAction {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
@Autowired
|
||||
private Cache<String, String> cache;
|
||||
|
||||
@GetMapping("/index")
|
||||
public String index() throws Exception{
|
||||
return "logging";
|
||||
}
|
||||
|
||||
@ResponseBody
|
||||
@PostMapping("/close")
|
||||
public void close(Boolean closed) throws Exception {
|
||||
cache.put("closed", String.valueOf(closed));
|
||||
logger.info("Metona MQ Mini Pro 停止消费者(顺序消息)...");
|
||||
if(closed){
|
||||
MetonaMQUtil.stopOrderedConsuming();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
50
src/main/java/cn/somkit/fmt/config/LogMonitorConfig.java
Normal file
50
src/main/java/cn/somkit/fmt/config/LogMonitorConfig.java
Normal file
@@ -0,0 +1,50 @@
|
||||
package cn.somkit.fmt.config;
|
||||
|
||||
import cn.metona.mq.MQToolkit;
|
||||
import cn.metona.mq.consumer.MessageConsumer;
|
||||
import cn.metona.mq.utils.MonitorUtils;
|
||||
import cn.somkit.fmt.socket.SocketManage;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.socket.TextMessage;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
@Component
|
||||
public class LogMonitorConfig {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
private static final String Queue_Name = "log-monitor";
|
||||
|
||||
private static final String Consumer_Id = "log-monitor-consumer";
|
||||
|
||||
@PostConstruct
|
||||
public void run(){
|
||||
//创建日志监控队列
|
||||
MQToolkit.createQueue(Queue_Name, Boolean.FALSE);
|
||||
//创建日志监控消费者
|
||||
MessageConsumer consumer = MQToolkit.createConsumer(Consumer_Id, Queue_Name, message -> {
|
||||
List<WebSocketSession> list = SocketManage.all();
|
||||
if(list != null && !list.isEmpty()){
|
||||
list.forEach(session -> {
|
||||
if(session.isOpen()){
|
||||
try {
|
||||
session.sendMessage(new TextMessage(message.body()));
|
||||
} catch (IOException e) {
|
||||
logger.error("发送消息失败:", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
//启动消费者
|
||||
consumer.start();
|
||||
// 显示队列统计
|
||||
MonitorUtils.printQueueStats(Queue_Name);
|
||||
}
|
||||
}
|
||||
@@ -6,8 +6,8 @@ import ch.qos.logback.core.filter.Filter;
|
||||
import ch.qos.logback.core.spi.FilterReply;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import cn.metona.mq.exception.MessageSendException;
|
||||
import cn.metona.mq.util.MetonaMQUtil;
|
||||
import cn.metona.mq.MQToolkit;
|
||||
import cn.metona.mq.core.MessageQueue;
|
||||
import cn.somkit.fmt.entity.LoggerMessage;
|
||||
|
||||
import java.time.Instant;
|
||||
@@ -15,6 +15,8 @@ import java.time.ZoneId;
|
||||
|
||||
public class LogStashFilter extends Filter<ILoggingEvent> {
|
||||
|
||||
private static final String Queue_Name = "log-monitor";
|
||||
|
||||
Level level;
|
||||
|
||||
public LogStashFilter() {
|
||||
@@ -29,12 +31,10 @@ public class LogStashFilter extends Filter<ILoggingEvent> {
|
||||
DateUtil.format(Instant.ofEpochMilli(e.getTimeStamp()).atZone(ZoneId.systemDefault()).toLocalDateTime(),
|
||||
"yyyy-MM-dd HH:mm:ss.SSS")
|
||||
);
|
||||
try {
|
||||
if(MetonaMQUtil.isInitialized() && MetonaMQUtil.isOrderedConsumerRunning()){
|
||||
MetonaMQUtil.send("log-topic", "log-monitor", JSONUtil.toJsonStr(msg));
|
||||
}
|
||||
} catch (MessageSendException ex) {
|
||||
System.out.println("发送消息队列失败");
|
||||
//发送日志信息到日志监控队列
|
||||
MessageQueue.QueueStats queueStats = MQToolkit.getQueueStats(Queue_Name);
|
||||
if(queueStats != null && queueStats.isRunning() && !queueStats.isPaused()){
|
||||
MQToolkit.sendMessage(Queue_Name, "log.monitor", JSONUtil.toJsonStr(msg));
|
||||
}
|
||||
return FilterReply.NEUTRAL;
|
||||
}
|
||||
|
||||
31
src/main/java/cn/somkit/fmt/socket/SocketManage.java
Normal file
31
src/main/java/cn/somkit/fmt/socket/SocketManage.java
Normal file
@@ -0,0 +1,31 @@
|
||||
package cn.somkit.fmt.socket;
|
||||
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class SocketManage {
|
||||
|
||||
private SocketManage() {
|
||||
}
|
||||
|
||||
private static final Map<String, WebSocketSession> onlineMap = new ConcurrentHashMap<>();
|
||||
|
||||
public static void put(WebSocketSession session){
|
||||
onlineMap.put(session.getId(), session);
|
||||
}
|
||||
|
||||
public static WebSocketSession get(String sessionId){
|
||||
return onlineMap.get(sessionId);
|
||||
}
|
||||
|
||||
public static void remove(String sessionId){
|
||||
onlineMap.remove(sessionId);
|
||||
}
|
||||
|
||||
public static List<WebSocketSession> all(){
|
||||
return onlineMap.values().stream().toList();
|
||||
}
|
||||
}
|
||||
@@ -1,70 +1,29 @@
|
||||
package cn.somkit.fmt.socket;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.metona.cache.Cache;
|
||||
import cn.metona.mq.consumer.MessageListener;
|
||||
import cn.metona.mq.core.Message;
|
||||
import cn.metona.mq.util.MetonaMQUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.socket.*;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
@Component
|
||||
public class WebSocketServerHandler implements WebSocketHandler {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
@Autowired
|
||||
private Cache<String, String> cache;
|
||||
|
||||
@Override
|
||||
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
|
||||
push(session);
|
||||
}
|
||||
|
||||
private void push(WebSocketSession session) throws IOException {
|
||||
boolean closed = StrUtil.isNotBlank(cache.get("closed")) && Boolean.parseBoolean(cache.get("closed"));
|
||||
if(!closed){
|
||||
try {
|
||||
if(MetonaMQUtil.isInitialized() && !MetonaMQUtil.isOrderedConsumerRunning()){
|
||||
logger.info("Metona MQ Mini Pro 订阅主题(log-topic)(顺序消息)...");
|
||||
MetonaMQUtil.subscribeOrdered("log-topic", new MessageListener() {
|
||||
@Override
|
||||
public void onMessage(Message message) {
|
||||
try {
|
||||
session.sendMessage(new TextMessage(message.getBody()));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
logger.info("Metona MQ Mini Pro 启动消费者(顺序消息)...");
|
||||
MetonaMQUtil.startOrderedConsuming();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Metona MQ Mini Pro 异常", e);
|
||||
}
|
||||
}
|
||||
SocketManage.put(session);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
|
||||
push(session);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
|
||||
|
||||
SocketManage.remove(session.getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
|
||||
|
||||
SocketManage.remove(session.getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,28 +0,0 @@
|
||||
package cn.somkit.fmt.utils;
|
||||
|
||||
import cn.somkit.fmt.entity.LoggerMessage;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
public final class LoggerQueue {
|
||||
private static final LoggerQueue INSTANCE = new LoggerQueue();
|
||||
private final BlockingQueue<LoggerMessage> queue = new LinkedBlockingQueue<>(100000);
|
||||
|
||||
public static LoggerQueue getInstance() {
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
public void push(LoggerMessage msg) {
|
||||
queue.offer(msg); // 非阻塞插入
|
||||
}
|
||||
|
||||
public LoggerMessage poll() {
|
||||
try {
|
||||
return queue.take(); // 阻塞直到有数据
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -14,18 +14,6 @@
|
||||
|
||||
let ws = null;
|
||||
|
||||
let logMonitor = async (closed = false) => {
|
||||
const options = {
|
||||
url: Fmt.ctx() + '/logging/close',
|
||||
data: {closed: closed},
|
||||
method: 'post'
|
||||
};
|
||||
await Fmt.axios(options).then(() => {}).catch((err) => console.error(err));
|
||||
if(ws){
|
||||
ws.send('发送日志');
|
||||
}
|
||||
}
|
||||
|
||||
const logger = new LogMonitorAdaptive('#logContainer', {
|
||||
theme: 'dark',
|
||||
maxLines: 10000,
|
||||
@@ -43,8 +31,8 @@
|
||||
showLevel: true, // 是否显示日志级别标签
|
||||
wordWrap: true, // 日志内容是否自动换行(true=换行,false=横向滚动)
|
||||
//暂停/继续 回调函数
|
||||
onTogglePause: async (isPaused) => {
|
||||
await logMonitor(isPaused);
|
||||
onTogglePause: (isPaused) => {
|
||||
|
||||
},
|
||||
onCreated: () => {
|
||||
console.log('日志容器已创建');
|
||||
@@ -59,10 +47,6 @@
|
||||
return false;
|
||||
}
|
||||
|
||||
ws.onopen = function () {
|
||||
|
||||
}
|
||||
|
||||
ws.onmessage = function (event) {
|
||||
if(event.data){
|
||||
let data = JSON.parse(event.data);
|
||||
|
||||
@@ -32,4 +32,8 @@
|
||||
> v2.1.0
|
||||
```
|
||||
引入metona-mq-mini-pro消息队列,重构实时日志获取方式
|
||||
```
|
||||
> v2.1.1
|
||||
```
|
||||
升级metona-mq-mini-pro到2.0.0,重构实时日志获取方式
|
||||
```
|
||||
Reference in New Issue
Block a user