Commit e2bf158a authored by luxurong's avatar luxurong
Browse files

ack

parent 405881b2
No related merge requests found
Showing with 11 additions and 21 deletions
+11 -21
...@@ -25,16 +25,16 @@ public abstract class AbsAck implements Ack { ...@@ -25,16 +25,16 @@ public abstract class AbsAck implements Ack {
private final int period; private final int period;
private final Consumer<Boolean> consumer; private final Runnable cleaner;
protected AbsAck(int maxRetrySize, int period, Runnable runnable, AckManager ackManager, Consumer<Boolean> consumer) { protected AbsAck(int maxRetrySize, int period, Runnable runnable, AckManager ackManager, Runnable cleaner) {
this.maxRetrySize = maxRetrySize; this.maxRetrySize = maxRetrySize;
this.period = period; this.period = period;
this.runnable = runnable; this.runnable = runnable;
this.ackManager = ackManager; this.ackManager = ackManager;
this.consumer= consumer; this.cleaner= cleaner;
} }
@Override @Override
...@@ -42,7 +42,6 @@ public abstract class AbsAck implements Ack { ...@@ -42,7 +42,6 @@ public abstract class AbsAck implements Ack {
if (++count <= maxRetrySize+1 && !died ) { if (++count <= maxRetrySize+1 && !died ) {
try { try {
log.info("task retry send ..........."); log.info("task retry send ...........");
consumer.accept(false);
runnable.run(); runnable.run();
ackManager.addAck(this); ackManager.addAck(this);
} catch (Exception e) { } catch (Exception e) {
...@@ -51,7 +50,7 @@ public abstract class AbsAck implements Ack { ...@@ -51,7 +50,7 @@ public abstract class AbsAck implements Ack {
} }
else { else {
consumer.accept(true); cleaner.run();
} }
} }
......
package io.github.quickmsg.common.ack; package io.github.quickmsg.common.ack;
import java.util.function.Consumer;
/** /**
* @author luxurong * @author luxurong
*/ */
...@@ -11,7 +9,7 @@ public class RetryAck extends AbsAck { ...@@ -11,7 +9,7 @@ public class RetryAck extends AbsAck {
private final long id; private final long id;
public RetryAck(long id, int maxRetrySize, int period, Runnable runnable, AckManager ackManager, Consumer<Boolean> consumer) { public RetryAck(long id, int maxRetrySize, int period, Runnable runnable, AckManager ackManager, Runnable consumer) {
super(maxRetrySize, period, runnable, ackManager,consumer); super(maxRetrySize, period, runnable, ackManager,consumer);
this.id = id; this.id = id;
} }
......
...@@ -271,18 +271,12 @@ public class MqttChannel { ...@@ -271,18 +271,12 @@ public class MqttChannel {
Increase the reference count of bytebuf, and the reference count of retrybytebuf is 2 Increase the reference count of bytebuf, and the reference count of retrybytebuf is 2
mqttChannel.write() method releases a reference count. mqttChannel.write() method releases a reference count.
*/ */
Runnable runnable = () -> mqttChannel.write(Mono.just(mqttMessage)).subscribe(); MqttMessage reply = getReplyMqttMessage(mqttMessage);
Consumer<Boolean> consumer = bool -> {
if (bool) { Runnable runnable = () -> mqttChannel.write(Mono.just(reply)).subscribe();
MessageUtils.safeRelease(mqttMessage); Runnable cleaner = () -> MessageUtils.safeRelease(reply);;
} else { Ack ack = new RetryAck(mqttChannel.generateId(reply.fixedHeader().messageType(), getMessageId(reply)),
if (mqttMessage instanceof MqttPublishMessage) { 5, 5, runnable, mqttChannel.getTimeAckManager(), cleaner);
((MqttPublishMessage) mqttMessage).retain();
}
}
};
Ack ack = new RetryAck(mqttChannel.generateId(mqttMessage.fixedHeader().messageType(), getMessageId(mqttMessage)),
5, 5, runnable, mqttChannel.getTimeAckManager(), consumer);
ack.start(); ack.start();
return mqttChannel.write(Mono.just(mqttMessage)).then(); return mqttChannel.write(Mono.just(mqttMessage)).then();
} else { } else {
...@@ -308,7 +302,6 @@ public class MqttChannel { ...@@ -308,7 +302,6 @@ public class MqttChannel {
} else { } else {
return mqttMessage; return mqttMessage;
} }
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment