Commit 760ab2aa authored by luxurong's avatar luxurong
Browse files

新增判断

parent afa50c56
Showing with 65 additions and 9 deletions
+65 -9
...@@ -9,7 +9,7 @@ public class ClusterNode1 { ...@@ -9,7 +9,7 @@ public class ClusterNode1 {
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
Bootstrap bootstrap = Bootstrap.builder() Bootstrap bootstrap = Bootstrap.builder()
.rootLevel(Level.INFO) .rootLevel(Level.ERROR)
.websocketConfig( .websocketConfig(
BootstrapConfig.WebsocketConfig BootstrapConfig.WebsocketConfig
.builder() .builder()
......
package topic;
import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.topic.SubscribeTopic;
import io.github.quickmsg.common.topic.TopicRegistry;
import io.github.quickmsg.core.spi.DefaultTopicRegistry;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.*;
/**
* @author luxurong
*/
public class TopicTest {
static ExecutorService service = Executors.newFixedThreadPool(100);
private static TopicRegistry topicRegistry = new DefaultTopicRegistry();
private static Map<Integer, MqttChannel> channelMap = new ConcurrentHashMap<>();
public static void main(String[] args) {
CountDownLatch count = new CountDownLatch(500000);
for(int i=0;i<500000;i++){
service.execute(()->{
int index = new Random().nextInt(1000);
MqttChannel mqttChannel =channelMap.computeIfAbsent(index,in->{
MqttChannel mqttChannel1=new MqttChannel();
mqttChannel1.setTopics(new CopyOnWriteArraySet<>());
return mqttChannel1;
});
SubscribeTopic subscribeTopic=new SubscribeTopic(String.valueOf(index), MqttQoS.AT_MOST_ONCE,mqttChannel);
topicRegistry.registrySubscribeTopic(subscribeTopic);
topicRegistry.getAllTopics();
count.countDown();
});
}
try {
count.await();
Map<String, Set<MqttChannel>> topics = topicRegistry.getAllTopics();
System.out.println(topics);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
...@@ -188,7 +188,7 @@ public class MqttChannel { ...@@ -188,7 +188,7 @@ public class MqttChannel {
*/ */
public Mono<Void> write(MqttMessage mqttMessage, boolean retry) { public Mono<Void> write(MqttMessage mqttMessage, boolean retry) {
// http本地mock // http本地mock
if (this.getIsMock()) { if (this.getIsMock() && !this.active()) {
return Mono.empty(); return Mono.empty();
} else { } else {
return MqttMessageSink.MQTT_SINK.sendMessage(mqttMessage, this, retry, replyMqttMessageMap); return MqttMessageSink.MQTT_SINK.sendMessage(mqttMessage, this, retry, replyMqttMessageMap);
...@@ -346,7 +346,7 @@ public class MqttChannel { ...@@ -346,7 +346,7 @@ public class MqttChannel {
@Override @Override
public String toString() { public String toString() {
return "MqttChannel{" + return "MqttChannel{" +
" address='" + this.connection.address().toString() + '\'' + // " address='" + this.connection.address().toString() + '\'' +
", clientIdentifier='" + clientIdentifier + '\'' + ", clientIdentifier='" + clientIdentifier + '\'' +
", status=" + status + ", status=" + status +
", keepalive=" + keepalive + ", keepalive=" + keepalive +
......
...@@ -79,8 +79,8 @@ public enum Event { ...@@ -79,8 +79,8 @@ public enum Event {
/** /**
* body * body
* *
* @param mqttChannel {@link MqttChannel } * @param mqttChannel {@link MqttChannel }
* @param body {@link Object } * @param body {@link Object }
* @return ByteBuf * @return ByteBuf
*/ */
public abstract ByteBuf writeBody(MqttChannel mqttChannel, Object body); public abstract ByteBuf writeBody(MqttChannel mqttChannel, Object body);
...@@ -92,6 +92,9 @@ public enum Event { ...@@ -92,6 +92,9 @@ public enum Event {
message message
, System.currentTimeMillis(), Boolean.FALSE), , System.currentTimeMillis(), Boolean.FALSE),
receiveContext); receiveContext);
if (message instanceof MqttPublishMessage) {
((MqttPublishMessage) message).release();
}
} }
} }
...@@ -45,11 +45,11 @@ public class DefaultProtocolAdaptor implements ProtocolAdaptor { ...@@ -45,11 +45,11 @@ public class DefaultProtocolAdaptor implements ProtocolAdaptor {
@Override @Override
public <C extends Configuration> void chooseProtocol(MqttChannel mqttChannel, SmqttMessage<MqttMessage> smqttMessage, ReceiveContext<C> receiveContext) { public <C extends Configuration> void chooseProtocol(MqttChannel mqttChannel, SmqttMessage<MqttMessage> smqttMessage, ReceiveContext<C> receiveContext) {
MqttMessage mqttMessage = smqttMessage.getMessage(); MqttMessage mqttMessage = smqttMessage.getMessage();
log.info(" 【{}】【{}】 【{}】",
Thread.currentThread().getName(),
mqttMessage.fixedHeader().messageType(),
mqttChannel);
if (mqttMessage.decoderResult() != null && (mqttMessage.decoderResult().isSuccess())) { if (mqttMessage.decoderResult() != null && (mqttMessage.decoderResult().isSuccess())) {
log.info(" 【{}】【{}】 【{}】",
Thread.currentThread().getName(),
mqttMessage.fixedHeader().messageType(),
mqttChannel);
Optional.ofNullable(types.get(mqttMessage.fixedHeader().messageType())) Optional.ofNullable(types.get(mqttMessage.fixedHeader().messageType()))
.ifPresent(protocol -> protocol .ifPresent(protocol -> protocol
.doParseProtocol(smqttMessage, mqttChannel) .doParseProtocol(smqttMessage, mqttChannel)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment