Commit 9f2c27d0 authored by luxurong's avatar luxurong
Browse files

数据库优化

No related merge requests found
Showing with 3 additions and 6 deletions
+3 -6
...@@ -41,8 +41,6 @@ public class MqttChannel { ...@@ -41,8 +41,6 @@ public class MqttChannel {
private ChannelStatus status; private ChannelStatus status;
private long activeTime;
private long authTime; private long authTime;
private long connectTime; private long connectTime;
...@@ -101,7 +99,6 @@ public class MqttChannel { ...@@ -101,7 +99,6 @@ public class MqttChannel {
mqttChannel.setReplyMqttMessageMap(new ConcurrentHashMap<>()); mqttChannel.setReplyMqttMessageMap(new ConcurrentHashMap<>());
mqttChannel.setMqttMessageSink(new MqttMessageSink()); mqttChannel.setMqttMessageSink(new MqttMessageSink());
mqttChannel.setQos2MsgCache(new ConcurrentHashMap<>()); mqttChannel.setQos2MsgCache(new ConcurrentHashMap<>());
mqttChannel.setActiveTime(System.currentTimeMillis());
mqttChannel.setConnection(connection); mqttChannel.setConnection(connection);
mqttChannel.setStatus(ChannelStatus.INIT); mqttChannel.setStatus(ChannelStatus.INIT);
mqttChannel.setAddress(connection.address().toString()); mqttChannel.setAddress(connection.address().toString());
......
...@@ -66,15 +66,15 @@ public class ConnectProtocol implements Protocol<MqttConnectMessage> { ...@@ -66,15 +66,15 @@ public class ConnectProtocol implements Protocol<MqttConnectMessage> {
byte mqttVersion = (byte) mqttConnectVariableHeader.version(); byte mqttVersion = (byte) mqttConnectVariableHeader.version();
PasswordAuthentication passwordAuthentication = mqttReceiveContext.getPasswordAuthentication(); PasswordAuthentication passwordAuthentication = mqttReceiveContext.getPasswordAuthentication();
/*check clientIdentifier exist*/ /*check clientIdentifier exist*/
MqttChannel existMqttChannel = channelRegistry.get(clientIdentifier);
if (mqttReceiveContext.getConfiguration().getConnectModel() == ConnectModel.UNIQUE) { if (mqttReceiveContext.getConfiguration().getConnectModel() == ConnectModel.UNIQUE) {
if (channelRegistry.exists(clientIdentifier)) { if (existMqttChannel != null && existMqttChannel.getStatus() == ChannelStatus.ONLINE) {
return mqttChannel.write( return mqttChannel.write(
MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, mqttVersion), MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, mqttVersion),
false).then(mqttChannel.close()); false).then(mqttChannel.close());
} }
} else { } else {
MqttChannel existMqttChannel = channelRegistry.get(clientIdentifier); if (existMqttChannel != null && existMqttChannel.getStatus() == ChannelStatus.ONLINE) {
if (existMqttChannel != null) {
if (System.currentTimeMillis() - existMqttChannel.getConnectTime() > (mqttReceiveContext.getConfiguration().getNotKickSecond() * 1000)) { if (System.currentTimeMillis() - existMqttChannel.getConnectTime() > (mqttReceiveContext.getConfiguration().getNotKickSecond() * 1000)) {
existMqttChannel.close().subscribe(); existMqttChannel.close().subscribe();
} else { } else {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment