"...main/java/git@git.gitsec.cn:lixiaofang/MeterSphere.git" did not exist on "pr@dev@fix_测试计划性能测试sql"
Unverified Commit 7d36a2aa authored by quickmsg's avatar quickmsg Committed by Gitee
Browse files

!5 1.1.12发布

Merge pull request !5 from quickmsg/release/bug-fix
parents ca5729ee fc344ffd
Showing with 176 additions and 136 deletions
+176 -136
## ![image](icon/logo.png) SMQTT开源的MQTT消息代理Broker
SMQTT基于reactor-netty(spring-webflux底层依赖)开发,底层采用Reactor3反应堆模型,支持单机部署,支持容器化部署,具备低延迟,高吞吐量,支持百万TCP连接,同时支持多种协议交互,是一款非常优秀的消息中间件!
SMQTT基于reactor-netty(spring-webflux底层依赖)
开发,底层采用Reactor3反应堆模型,支持单机部署,支持容器化部署,具备低延迟,高吞吐量,支持百万TCP连接,同时支持多种协议交互,是一款非常优秀的消息中间件!
## smqtt目前拥有的功能如下:
![架构图](icon/component.png)
![架构图](icon/component.png)
1. 消息质量等级实现(支持qos0,qos1,qos2)
2. topicFilter支持
- topic分级(test/test)
- +支持(单层匹配)
- #支持(多层匹配)
- # 支持(多层匹配)
3. 会话消息
- 默认内存存储
- 支持持久化(redis/db)
4. 保留消息
- 默认内存存储
- 支持持久化(redis/db)
- 默认内存存储
- 支持持久化(redis/db)
5. 遗嘱消息
> 设备掉线时候触发
> 设备掉线时候触发
6. 客户端认证
- 支持spi注入外部认证
- 支持spi注入外部认证
7. tls加密
- 支持tls加密(mqtt端口/http端口)
- 支持tls加密(mqtt端口/http端口)
8. websocket协议支持x
> 使用mqtt over websocket
> 使用mqtt over websocket
9. http协议交互
- 支持http接口推送消息
- 支持spi扩展http接口
10. SPI接口扩展支持
- 消息管理接口(会话消息/保留消息管理)
- 通道管理接口 (管理系统的客户端连接)
- 认证接口 (用于自定义外部认证)
- 拦截器 (用户自定义拦截消息)
- 消息管理接口(会话消息/保留消息管理)
- 通道管理接口 (管理系统的客户端连接)
- 认证接口 (用于自定义外部认证)
- 拦截器 (用户自定义拦截消息)
11. 集群支持(gossip协议实现)
12. 容器化支持
12. 容器化支持
> 默认镜像最新tag: 1ssqq1lxr/smqtt
13. 持久化支持(session 保留消息)
14. 规则引擎支持
15. 支持springboot starter启动
16. 管理后台
> 请参考smqtt文档如何启动管理后台
17. grafana监控集成
- 支持influxdb
- 支持prometheus
## 尝试一下
> 大家不要恶意链接,谢谢!
......@@ -60,6 +65,7 @@ SMQTT基于reactor-netty(spring-webflux底层依赖)开发,底层采用Reactor
### main方式启动
引入依赖
```markdown
<!--smqtt依赖 -->
<dependency>
......@@ -85,69 +91,55 @@ SMQTT基于reactor-netty(spring-webflux底层依赖)开发,底层采用Reactor
```markdown
Bootstrap bootstrap = Bootstrap.builder()
.rootLevel(Level.DEBUG)
.tcpConfig(
BootstrapConfig
.TcpConfig
.builder()
.port(8888)
.username("smqtt")
.password("smqtt")
.build())
.httpConfig(
BootstrapConfig
.HttpConfig
.builder()
.enable(true)
.accessLog(true)
.build())
.clusterConfig(
BootstrapConfig.
ClusterConfig
.builder()
.enable(true)
.namespace("smqtt")
.node("node-1")
.port(7773)
.url("127.0.0.1:7771,127.0.0.1:7772").
build())
.build()
.startAwait();
.rootLevel(Level.DEBUG)
.tcpConfig(
BootstrapConfig .TcpConfig .builder()
.port(8888)
.username("smqtt")
.password("smqtt")
.build())
.httpConfig(
BootstrapConfig .HttpConfig .builder()
.enable(true)
.accessLog(true)
.build())
.clusterConfig(
BootstrapConfig. ClusterConfig .builder()
.enable(true)
.namespace("smqtt")
.node("node-1")
.port(7773)
.url("127.0.0.1:7771,127.0.0.1:7772"). build())
.build()
.startAwait();
```
- 非阻塞式启动服务:
```markdown
Bootstrap bootstrap = Bootstrap.builder()
.rootLevel(Level.DEBUG)
.tcpConfig(
BootstrapConfig
.TcpConfig
.builder()
.port(8888)
.username("smqtt")
.password("smqtt")
.build())
.httpConfig(
BootstrapConfig
.HttpConfig
.builder()
.enable(true)
.accessLog(true)
.build())
.clusterConfig(
BootstrapConfig.
ClusterConfig
.builder()
.enable(true)
.namespace("smqtt")
.node("node-1")
.port(7773)
.url("127.0.0.1:7771,127.0.0.1:7772").
build())
.build()
.start().block();
Bootstrap bootstrap = Bootstrap.builder()
.rootLevel(Level.DEBUG)
.tcpConfig(
BootstrapConfig .TcpConfig .builder()
.port(8888)
.username("smqtt")
.password("smqtt")
.build())
.httpConfig(
BootstrapConfig .HttpConfig .builder()
.enable(true)
.accessLog(true)
.build())
.clusterConfig(
BootstrapConfig. ClusterConfig .builder()
.enable(true)
.namespace("smqtt")
.node("node-1")
.port(7773)
.url("127.0.0.1:7771,127.0.0.1:7772"). build())
.build()
.start().block();
```
### jar方式
......@@ -168,11 +160,8 @@ SMQTT基于reactor-netty(spring-webflux底层依赖)开发,底层采用Reactor
java -jar smqtt-bootstrap-1.0.1-SNAPSHOT.jar <config.yaml路径>
```
### docker 方式
拉取镜像
```
......@@ -189,18 +178,15 @@ docker run -it -p 1883:1883 1ssqq1lxr/smqtt
启动镜像使用自定义配置(同上准备配置文件config.yaml)
```
# 启动服务
docker run -it -v <配置文件路径目录>:/conf -p 1883:1883 -p 1999:1999 1ssqq1lxr/smqtt
```
### springboot方式
1. 引入依赖
```markdown
<dependency>
<groupId>io.github.quickmsg</groupId>
......@@ -212,23 +198,25 @@ docker run -it -v <配置文件路径目录>:/conf -p 1883:1883 -p 1999:1999 1
2. 启动类Application上添加注解 ` @EnableMqttServer`
3. 配置application.yml文件
> properties也支持,但是需要自己转换,没有提供demo文件
> properties也支持,但是需要自己转换,没有提供demo文件
[config.yaml](config/config.yaml)
4. 启动springboot服务服务即可
5. 如果引入的是spring-boot-starter-parent的管理包,如果启动报错,则需要添加以下依赖
```xml
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.9</version>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>1.0.10</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.9</version>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>1.0.10</version>
</dependency>
```
## 官网地址
......@@ -239,43 +227,44 @@ docker run -it -v <配置文件路径目录>:/conf -p 1883:1883 -p 1999:1999 1
[wiki地址](https://wiki.smqtt.cc/)
## 管理后台
![image](icon/admin.png)
![image](icon/admin.png)
## 监控页面
### Mqtt监控
![image](icon/application.png)
![image](icon/application.png)
### Jvm监控
![image](icon/jvm.png)
### Netty监控
![image](icon/netty.png)
![image](icon/netty.png)
## License
[Apache License, Version 2.0](LICENSE)
## 友情链接
[一款非常好用的IOT平台:thinglinks](https://github.com/mqttsnet/thinglinks)
一款非常好用的IOT平台 thinglinks:
- [Github](https://github.com/mqttsnet/thinglinks)
- [Gitee](https://gitee.com/mqttsnet/thinglinks)
## 相关技术文档
- [reactor3](https://projectreactor.io/docs/core/release/reference/)
- [reactor-netty](https://projectreactor.io/docs/netty/1.0.12/reference/index.html)
## 麻烦关注下公众号!
![image](icon/icon.jpg)
![image](icon/icon.jpg)
- 添加微信号`Lemon877164954`,拉入smqtt官方交流群
- 加入qq群 `700152283`
smqtt:
logLevel: INFO # 系统日志
tcp: # tcp配置
connectModel: UNIQUE # UNIQUE 唯一 KICK 互踢
connectModel: KICK # UNIQUE 唯一 KICK 互踢
notKickSecond: 30 # KICK互踢模式生效, 单位秒, 指定时间内客户端不互踢, 避免客户端自动连接持续互踢
port: 1883 # mqtt端口号
username: smqtt # mqtt连接默认用户名 生产环境建议spi去注入PasswordAuthentication接口
password: smqtt # mqtt连接默认密码 生产环境建议spi去注入PasswordAuthentication接口
......
......@@ -9,7 +9,7 @@ import java.util.function.Consumer;
/**
* @author luxurong
*/
@Slf4j
@Slf4j(topic = "ack")
public abstract class AbsAck implements Ack {
private final int maxRetrySize;
......
......@@ -45,6 +45,8 @@ public class MqttChannel {
private long authTime;
private long connectTime;
private boolean sessionPersistent;
private Will will;
......@@ -273,7 +275,8 @@ public class MqttChannel {
MqttMessage reply = getReplyMqttMessage(mqttMessage);
Runnable runnable = () -> mqttChannel.write(Mono.just(reply)).subscribe();
Runnable cleaner = () -> MessageUtils.safeRelease(reply);;
Runnable cleaner = () -> MessageUtils.safeRelease(reply);
;
Ack ack = new RetryAck(mqttChannel.generateId(reply.fixedHeader().messageType(), getMessageId(reply)),
5, 5, runnable, mqttChannel.getTimeAckManager(), cleaner);
ack.start();
......
......@@ -32,7 +32,7 @@ public class BootstrapConfig {
smqttConfig.setLogLevel("INFO");
bootstrapConfig.setSmqttConfig(smqttConfig);
smqttConfig.setClusterConfig(ClusterConfig.builder()
.enable(false).build());
.enable(false).build());
smqttConfig.setHttpConfig(HttpConfig.builder()
.enable(false).build());
smqttConfig.setWebsocketConfig(WebsocketConfig.builder()
......@@ -114,6 +114,12 @@ public class BootstrapConfig {
@Builder.Default
private ConnectModel connectModel = ConnectModel.UNIQUE;
/**
* 不互踢时间 默认60s
*/
@Builder.Default
private Integer notKickSecond = 60;
/**
* 端口
*/
......@@ -177,7 +183,7 @@ public class BootstrapConfig {
/**
* 单个连接读写字节限制
*/
private String channelReadWriteSize;
private String channelReadWriteSize;
/**
......@@ -437,7 +443,6 @@ public class BootstrapConfig {
/**
* 指标配置
*/
@Data
@Builder
......
package io.github.quickmsg.common.config;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
......@@ -8,6 +9,7 @@ import lombok.NoArgsConstructor;
*/
@Data
@NoArgsConstructor
@Builder
public class SslContext {
private String crt;
......
......@@ -43,6 +43,7 @@ public class HeapMqttMessage {
}
private Object getJsonObject(String body) {
body=body.replaceAll("\r|\n|\t", "");
if (body.startsWith("{") && body.endsWith("}")) {
return JacksonUtil.json2Bean(body, JsonMap.class);
} else if (body.startsWith("[") && body.endsWith("]")) {
......
......@@ -5,6 +5,8 @@ package io.github.quickmsg.common.utils;
*/
public class TopicRegexUtils {
public static final TopicRegexUtils instance = new TopicRegexUtils();
public static String regexTopic(String topic) {
if (topic.startsWith("$")) {
topic = "\\" + topic;
......@@ -15,5 +17,8 @@ public class TopicRegexUtils {
.replaceAll("#", "(.+)") + "$";
}
public String regularTopic(String topic) {
return TopicRegexUtils.regexTopic(topic);
}
}
......@@ -77,6 +77,7 @@ public class Bootstrap {
}
}
Optional.ofNullable(tcpConfig.getConnectModel()).ifPresent(mqttConfiguration::setConnectModel);
Optional.ofNullable(tcpConfig.getNotKickSecond()).ifPresent(mqttConfiguration::setNotKickSecond);
Optional.ofNullable(tcpConfig.getPort()).ifPresent(mqttConfiguration::setPort);
Optional.ofNullable(tcpConfig.getLowWaterMark()).ifPresent(mqttConfiguration::setLowWaterMark);
Optional.ofNullable(tcpConfig.getHighWaterMark()).ifPresent(mqttConfiguration::setHighWaterMark);
......
......@@ -98,7 +98,7 @@ public abstract class AbstractReceiveContext<T extends Configuration> implements
this.messageRegistry.startUp(abstractConfiguration.getEnvironmentMap());
this.metricManager = metricManager(abstractConfiguration.getMeterConfig());
Optional.ofNullable(abstractConfiguration.getSourceDefinitions()).ifPresent(sourceDefinitions -> sourceDefinitions.forEach(SourceManager::loadSource));
this.timeAckManager = new TimeAckManager(20, TimeUnit.MILLISECONDS,512);
this.timeAckManager = new TimeAckManager(20, TimeUnit.MILLISECONDS,50);
}
private TrafficHandlerLoader trafficHandlerLoader() {
......
......@@ -37,6 +37,8 @@ public class MqttConfiguration extends AbstractSslHandler implements AbstractCon
private ConnectModel connectModel;
private Integer notKickSecond;
private PasswordAuthentication reactivePasswordAuth = (u, p, c) -> true;
private Integer bossThreadSize = Runtime.getRuntime().availableProcessors();
......@@ -57,7 +59,7 @@ public class MqttConfiguration extends AbstractSslHandler implements AbstractCon
private BootstrapConfig.ClusterConfig clusterConfig;
private BootstrapConfig.MeterConfig meterConfig ;
private BootstrapConfig.MeterConfig meterConfig;
private List<RuleChainDefinition> ruleChainDefinitions;
......
......@@ -30,7 +30,10 @@ public class MqttReceiveContext extends AbstractReceiveContext<MqttConfiguration
.inbound()
.receiveObject()
.cast(MqttMessage.class)
.doOnError(throwable -> log.error("on connect error",throwable))
.onErrorContinue((throwable, o) -> {
log.error("on message error {}",o,throwable);
})
.filter(mqttMessage -> mqttMessage.decoderResult().isSuccess())
.subscribe(mqttMessage -> this.accept(mqttChannel, new SmqttMessage<>(mqttMessage,System.currentTimeMillis(),Boolean.FALSE)));
}
......
......@@ -72,8 +72,16 @@ public class ConnectProtocol implements Protocol<MqttConnectMessage> {
false).then(mqttChannel.close());
}
} else {
Optional.ofNullable( channelRegistry.get(clientIdentifier))
.ifPresent(ch->ch.close().subscribe());
MqttChannel existMqttChannel = channelRegistry.get(clientIdentifier);
if (existMqttChannel != null) {
if (System.currentTimeMillis() - existMqttChannel.getConnectTime() > (mqttReceiveContext.getConfiguration().getNotKickSecond() * 1000)) {
existMqttChannel.close().subscribe();
} else {
return mqttChannel.write(
MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED),
false).then(mqttChannel.close());
}
}
}
/*protocol version support*/
if (MqttVersion.MQTT_3_1_1.protocolLevel() != (byte) mqttConnectVariableHeader.version()
......@@ -96,6 +104,7 @@ public class ConnectProtocol implements Protocol<MqttConnectMessage> {
.build());
}
mqttChannel.setAuthTime(System.currentTimeMillis());
mqttChannel.setConnectTime(System.currentTimeMillis());
mqttChannel.setKeepalive(mqttConnectVariableHeader.keepAliveTimeSeconds());
mqttChannel.setSessionPersistent(!mqttConnectVariableHeader.isCleanSession());
mqttChannel.setStatus(ChannelStatus.ONLINE);
......
......@@ -45,24 +45,23 @@ public class DefaultProtocolAdaptor implements ProtocolAdaptor {
@Override
public <C extends Configuration> void chooseProtocol(MqttChannel mqttChannel, SmqttMessage<MqttMessage> smqttMessage, ReceiveContext<C> receiveContext) {
MqttMessage mqttMessage = smqttMessage.getMessage();
if (mqttMessage.decoderResult() != null && (mqttMessage.decoderResult().isSuccess())) {
log.info(" 【{}】【{}】 【{}】",
Thread.currentThread().getName(),
mqttMessage.fixedHeader().messageType(),
mqttChannel);
Optional.ofNullable(types.get(mqttMessage.fixedHeader().messageType()))
.ifPresent(protocol -> protocol
.doParseProtocol(smqttMessage, mqttChannel)
.contextWrite(context -> context.putNonNull(ReceiveContext.class, receiveContext))
.subscribeOn(scheduler)
.subscribe(aVoid -> {
}, error -> {
log.error("channel {} chooseProtocol: {} error {}", mqttChannel, mqttMessage, error.getMessage());
ReactorNetty.safeRelease(mqttMessage.payload());
}, () -> ReactorNetty.safeRelease(mqttMessage.payload())));
} else {
log.error("chooseProtocol {} error mqttMessage {} ", mqttChannel, mqttMessage);
}
log.info(" 【{}】【{}】 【{}】",
Thread.currentThread().getName(),
mqttMessage.fixedHeader().messageType(),
mqttChannel);
Optional.ofNullable(types.get(mqttMessage.fixedHeader().messageType()))
.ifPresent(protocol -> protocol
.doParseProtocol(smqttMessage, mqttChannel)
.contextWrite(context -> context.putNonNull(ReceiveContext.class, receiveContext))
.subscribeOn(scheduler)
.onErrorContinue(((throwable, o) -> {
}))
.subscribe(aVoid -> {
}, error -> {
log.error("channel {} chooseProtocol: {} error {}", mqttChannel, mqttMessage, error.getMessage());
ReactorNetty.safeRelease(mqttMessage.payload());
}, () -> ReactorNetty.safeRelease(mqttMessage.payload())));
}
......
......@@ -4,11 +4,11 @@ import io.github.quickmsg.common.config.Configuration;
import io.github.quickmsg.common.config.SslContext;
import io.github.quickmsg.core.mqtt.MqttConfiguration;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import lombok.extern.slf4j.Slf4j;
import reactor.netty.tcp.SslProvider;
import reactor.netty.tcp.TcpServer;
import reactor.netty.tcp.TcpSslContextSpec;
import java.io.File;
import java.util.Map;
......@@ -25,7 +25,7 @@ public class AbstractSslHandler {
File cert;
File key;
SslContext sslContext = configuration.getSslContext();
if (sslContext != null) {
if (sslContext != null && sslContext.getCrt() != null && sslContext.getKey() != null) {
cert = new File(sslContext.getCrt());
key = new File(sslContext.getKey());
......@@ -33,9 +33,10 @@ public class AbstractSslHandler {
SelfSignedCertificate ssc = new SelfSignedCertificate();
cert = ssc.certificate();
key = ssc.privateKey();
log.info("SelfSignedCertificate cert {} key {}",cert.getAbsolutePath(),key.getAbsolutePath());
}
SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(cert, key);
sslContextSpec.sslContext(sslContextBuilder);
TcpSslContextSpec tcpSslContextSpec = TcpSslContextSpec.forServer(cert, key);
sslContextSpec.sslContext(tcpSslContextSpec);
}
} catch (Exception e) {
......@@ -48,7 +49,7 @@ public class AbstractSslHandler {
public TcpServer initTcpServer(MqttConfiguration mqttConfiguration) {
TcpServer server = TcpServer.create();
if (mqttConfiguration.getSsl()) {
server.secure(sslContextSpec -> this.secure(sslContextSpec, mqttConfiguration));
server = server.secure(sslContextSpec -> this.secure(sslContextSpec, mqttConfiguration));
}
if (mqttConfiguration.getOptions() != null) {
for (Map.Entry<String, Object> entry : mqttConfiguration.getOptions().entrySet()) {
......
......@@ -23,7 +23,7 @@
</pattern>
</encoder>
</appender>
<appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
<appender name="system" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>logs/smqtt.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>30</maxHistory>
......@@ -33,6 +33,16 @@
</encoder>
</appender>
<appender name="ack" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>logs/ack.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<logger name="reactor.netty" level="info"/>
<logger name="reactor.netty.http.client.HttpClient" level="debug"/>
<logger name="reactor.netty.channel.FluxReceive" level="info"/>
......@@ -47,8 +57,14 @@
<root level="INFO">
<appender-ref ref="stdout"/>
<appender-ref ref="file"/>
<appender-ref ref="system"/>
</root>
<logger name="ack" level="INFO" additivity="false">
<!-- <appender-ref ref="console" /> -->
<appender-ref ref="ack"/>
</logger>
</configuration>
\ No newline at end of file
package io.github.quickmsg.rule;
import io.github.quickmsg.common.utils.TopicRegexUtils;
import org.apache.commons.jexl3.*;
import reactor.util.context.ContextView;
......@@ -19,7 +20,8 @@ public interface RuleExecute {
* 执行
*
* @param context 上下文容器
\ */
* \
*/
void execute(ContextView context);
......@@ -34,6 +36,7 @@ public interface RuleExecute {
JexlExpression e = J_EXL_ENGINE.createExpression(script);
MapContext context = new MapContext();
mapContextConsumer.accept(context);
context.set("TopicRegexUtils", TopicRegexUtils.instance);
return e.evaluate(context);
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment