v2.1.0 更新:
1、引入metona-mq-mini-pro消息队列,重构实时日志获取方式
This commit is contained in:
8
pom.xml
8
pom.xml
@@ -10,7 +10,7 @@
|
|||||||
</parent>
|
</parent>
|
||||||
<groupId>cn.somkit</groupId>
|
<groupId>cn.somkit</groupId>
|
||||||
<artifactId>fmt</artifactId>
|
<artifactId>fmt</artifactId>
|
||||||
<version>2.0.1</version>
|
<version>2.1.0</version>
|
||||||
<name>fmt</name>
|
<name>fmt</name>
|
||||||
<description>File Manage System for by SpringBoot</description>
|
<description>File Manage System for by SpringBoot</description>
|
||||||
<properties>
|
<properties>
|
||||||
@@ -51,6 +51,12 @@
|
|||||||
<artifactId>metona-cache-spring-boot-starter</artifactId>
|
<artifactId>metona-cache-spring-boot-starter</artifactId>
|
||||||
<version>1.0.0</version>
|
<version>1.0.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>cn.metona</groupId>
|
||||||
|
<artifactId>metona-mq-mini-pro</artifactId>
|
||||||
|
<version>1.0.1</version>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|||||||
@@ -1,13 +1,26 @@
|
|||||||
package cn.somkit.fmt;
|
package cn.somkit.fmt;
|
||||||
|
|
||||||
|
import cn.metona.mq.consumer.MessageListener;
|
||||||
|
import cn.metona.mq.core.Message;
|
||||||
|
import cn.metona.mq.exception.MessageConsumeException;
|
||||||
|
import cn.metona.mq.util.MetonaMQUtil;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.boot.SpringApplication;
|
import org.springframework.boot.SpringApplication;
|
||||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
|
|
||||||
@SpringBootApplication
|
@SpringBootApplication
|
||||||
public class FmtApplication {
|
public class FmtApplication {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(FmtApplication.class);
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
SpringApplication.run(FmtApplication.class, args);
|
SpringApplication.run(FmtApplication.class, args);
|
||||||
|
|
||||||
|
if(!MetonaMQUtil.isInitialized()){
|
||||||
|
logger.info("Metona MQ Mini Pro 初始化...");
|
||||||
|
MetonaMQUtil.init();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,10 @@
|
|||||||
package cn.somkit.fmt.action;
|
package cn.somkit.fmt.action;
|
||||||
|
|
||||||
import cn.metona.cache.Cache;
|
import cn.metona.cache.Cache;
|
||||||
|
import cn.metona.mq.util.MetonaMQUtil;
|
||||||
|
import cn.somkit.fmt.socket.WebSocketServerHandler;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Controller;
|
import org.springframework.stereotype.Controller;
|
||||||
import org.springframework.web.bind.annotation.GetMapping;
|
import org.springframework.web.bind.annotation.GetMapping;
|
||||||
@@ -12,8 +16,10 @@ import org.springframework.web.bind.annotation.ResponseBody;
|
|||||||
@RequestMapping("/logging")
|
@RequestMapping("/logging")
|
||||||
public class LoggingAction {
|
public class LoggingAction {
|
||||||
|
|
||||||
|
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private Cache<String, Object> cache;
|
private Cache<String, String> cache;
|
||||||
|
|
||||||
@GetMapping("/index")
|
@GetMapping("/index")
|
||||||
public String index() throws Exception{
|
public String index() throws Exception{
|
||||||
@@ -23,6 +29,10 @@ public class LoggingAction {
|
|||||||
@ResponseBody
|
@ResponseBody
|
||||||
@PostMapping("/close")
|
@PostMapping("/close")
|
||||||
public void close(Boolean closed) throws Exception {
|
public void close(Boolean closed) throws Exception {
|
||||||
cache.put("closed", closed);
|
cache.put("closed", String.valueOf(closed));
|
||||||
|
logger.info("Metona MQ Mini Pro 停止消费者(顺序消息)...");
|
||||||
|
if(closed){
|
||||||
|
MetonaMQUtil.stopOrderedConsuming();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,8 +5,10 @@ import ch.qos.logback.classic.spi.ILoggingEvent;
|
|||||||
import ch.qos.logback.core.filter.Filter;
|
import ch.qos.logback.core.filter.Filter;
|
||||||
import ch.qos.logback.core.spi.FilterReply;
|
import ch.qos.logback.core.spi.FilterReply;
|
||||||
import cn.hutool.core.date.DateUtil;
|
import cn.hutool.core.date.DateUtil;
|
||||||
|
import cn.hutool.json.JSONUtil;
|
||||||
|
import cn.metona.mq.exception.MessageSendException;
|
||||||
|
import cn.metona.mq.util.MetonaMQUtil;
|
||||||
import cn.somkit.fmt.entity.LoggerMessage;
|
import cn.somkit.fmt.entity.LoggerMessage;
|
||||||
import cn.somkit.fmt.utils.LoggerQueue;
|
|
||||||
|
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.time.ZoneId;
|
import java.time.ZoneId;
|
||||||
@@ -27,7 +29,14 @@ public class LogStashFilter extends Filter<ILoggingEvent> {
|
|||||||
DateUtil.format(Instant.ofEpochMilli(e.getTimeStamp()).atZone(ZoneId.systemDefault()).toLocalDateTime(),
|
DateUtil.format(Instant.ofEpochMilli(e.getTimeStamp()).atZone(ZoneId.systemDefault()).toLocalDateTime(),
|
||||||
"yyyy-MM-dd HH:mm:ss.SSS")
|
"yyyy-MM-dd HH:mm:ss.SSS")
|
||||||
);
|
);
|
||||||
LoggerQueue.getInstance().push(msg); // 单例阻塞队列
|
try {
|
||||||
|
System.out.println(MetonaMQUtil.getConsumerStatus());
|
||||||
|
if(MetonaMQUtil.isInitialized() && MetonaMQUtil.isOrderedConsumerRunning()){
|
||||||
|
MetonaMQUtil.send("log-topic", "log-monitor", JSONUtil.toJsonStr(msg));
|
||||||
|
}
|
||||||
|
} catch (MessageSendException ex) {
|
||||||
|
System.out.println("发送消息队列失败");
|
||||||
|
}
|
||||||
return FilterReply.NEUTRAL;
|
return FilterReply.NEUTRAL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,10 +1,10 @@
|
|||||||
package cn.somkit.fmt.socket;
|
package cn.somkit.fmt.socket;
|
||||||
|
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import cn.hutool.json.JSONUtil;
|
|
||||||
import cn.metona.cache.Cache;
|
import cn.metona.cache.Cache;
|
||||||
import cn.somkit.fmt.entity.LoggerMessage;
|
import cn.metona.mq.consumer.MessageListener;
|
||||||
import cn.somkit.fmt.utils.LoggerQueue;
|
import cn.metona.mq.core.Message;
|
||||||
|
import cn.metona.mq.util.MetonaMQUtil;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@@ -19,7 +19,7 @@ public class WebSocketServerHandler implements WebSocketHandler {
|
|||||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private Cache<String, Object> cache;
|
private Cache<String, String> cache;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
|
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
|
||||||
@@ -27,10 +27,29 @@ public class WebSocketServerHandler implements WebSocketHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void push(WebSocketSession session) throws IOException {
|
private void push(WebSocketSession session) throws IOException {
|
||||||
while (StrUtil.isBlankIfStr(cache.get("closed")) || !Boolean.parseBoolean(String.valueOf(cache.get("closed")))) {
|
boolean closed = StrUtil.isNotBlank(cache.get("closed")) && Boolean.parseBoolean(cache.get("closed"));
|
||||||
LoggerMessage log = LoggerQueue.getInstance().poll();
|
if(!closed){
|
||||||
if(log != null){
|
try {
|
||||||
session.sendMessage(new TextMessage(JSONUtil.toJsonStr(log)));
|
if(MetonaMQUtil.isInitialized()){
|
||||||
|
logger.info("Metona MQ Mini Pro 订阅主题(log-topic)(顺序消息)...");
|
||||||
|
MetonaMQUtil.subscribeOrdered("log-topic", new MessageListener() {
|
||||||
|
@Override
|
||||||
|
public void onMessage(Message message) {
|
||||||
|
try {
|
||||||
|
session.sendMessage(new TextMessage(message.getBody()));
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if(!MetonaMQUtil.isOrderedConsumerRunning()){
|
||||||
|
logger.info("Metona MQ Mini Pro 启动消费者(顺序消息)...");
|
||||||
|
MetonaMQUtil.startOrderedConsuming();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Metona MQ Mini Pro 异常", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user