Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
小 白蛋
Mqtt Cluster
Commits
504464e3
Commit
504464e3
authored
3 years ago
by
Easy
Browse files
Options
Download
Plain Diff
Merge branch 'release/bug-fix' into main_acl
parents
5638c90f
593b62af
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/src/main/java/io/github/quickmsg/source/mqtt/MqttSourceBean.java
+41
-4
...n/java/io/github/quickmsg/source/mqtt/MqttSourceBean.java
with
41 additions
and
4 deletions
+41
-4
smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/src/main/java/io/github/quickmsg/source/mqtt/MqttSourceBean.java
+
41
-
4
View file @
504464e3
...
...
@@ -3,6 +3,7 @@ package io.github.quickmsg.source.mqtt;
import
com.hivemq.client.mqtt.MqttClient
;
import
com.hivemq.client.mqtt.datatypes.MqttQos
;
import
com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient
;
import
com.hivemq.client.mqtt.mqtt3.lifecycle.Mqtt3ClientDisconnectedContext
;
import
com.hivemq.client.mqtt.mqtt3.message.connect.Mqtt3ConnectBuilder
;
import
com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAck
;
import
io.github.quickmsg.common.rule.source.Source
;
...
...
@@ -11,10 +12,12 @@ import io.github.quickmsg.common.utils.JacksonUtil;
import
io.netty.util.internal.StringUtil
;
import
lombok.extern.slf4j.Slf4j
;
import
java.time.LocalTime
;
import
java.util.Map
;
import
java.util.Objects
;
import
java.util.Optional
;
import
java.util.concurrent.CompletableFuture
;
import
java.util.concurrent.TimeUnit
;
/**
...
...
@@ -50,6 +53,27 @@ public class MqttSourceBean implements SourceBean {
.
identifier
(
clientId
)
.
serverHost
(
host
)
.
serverPort
(
port
)
.
addDisconnectedListener
(
context
->
{
context
.
getReconnector
()
.
reconnect
(
true
)
// always reconnect (includes calling disconnect)
.
delay
(
2L
*
context
.
getReconnector
().
getAttempts
(),
TimeUnit
.
SECONDS
);
// linear scaling delay
})
.
addDisconnectedListener
(
context
->
{
final
Mqtt3ClientDisconnectedContext
context3
=
(
Mqtt3ClientDisconnectedContext
)
context
;
String
userName
=
sourceParam
.
get
(
"userName"
).
toString
();
String
passWord
=
sourceParam
.
get
(
"passWord"
).
toString
();
if
(!
StringUtil
.
isNullOrEmpty
(
userName
)
&&
!
StringUtil
.
isNullOrEmpty
(
passWord
))
{
context3
.
getReconnector
()
.
connectWith
()
.
simpleAuth
()
.
username
(
userName
)
.
password
(
passWord
.
getBytes
())
.
applySimpleAuth
()
.
applyConnect
();
}
})
.
addConnectedListener
(
context
->
log
.
info
(
"mqtt client connected "
+
LocalTime
.
now
()))
.
addDisconnectedListener
(
context
->
log
.
error
(
"mqtt client disconnected "
+
LocalTime
.
now
()))
.
buildAsync
();
Mqtt3ConnectBuilder
.
Send
<
CompletableFuture
<
Mqtt3ConnAck
>>
completableFutureSend
=
client
.
connectWith
();
...
...
@@ -63,7 +87,6 @@ public class MqttSourceBean implements SourceBean {
.
password
(
passWord
.
getBytes
())
.
applySimpleAuth
();
}
}
completableFutureSend
...
...
@@ -92,10 +115,10 @@ public class MqttSourceBean implements SourceBean {
@Override
public
void
transmit
(
Map
<
String
,
Object
>
object
)
{
String
topic
=
(
String
)
object
.
get
(
"topic"
);
Object
msg
=
object
.
get
(
"msg"
);
String
bytes
=
msg
instanceof
Map
?
JacksonUtil
.
map2Json
((
Map
<?
extends
Object
,
?
extends
Object
>)
msg
):
msg
.
toString
();
Object
msg
=
object
.
get
(
"msg"
);
String
bytes
=
msg
instanceof
Map
?
JacksonUtil
.
map2Json
((
Map
<?
extends
Object
,
?
extends
Object
>)
msg
)
:
msg
.
toString
();
Boolean
retain
=
(
Boolean
)
object
.
get
(
"retain"
);
Integer
qos
=
Optional
.
ofNullable
((
Integer
)
object
.
get
(
"qos"
)).
orElse
(
0
);
Integer
qos
=
Optional
.
ofNullable
((
Integer
)
object
.
get
(
"qos"
)).
orElse
(
0
);
client
.
publishWith
()
.
topic
(
topic
)
.
payload
(
bytes
.
getBytes
())
...
...
@@ -117,4 +140,18 @@ public class MqttSourceBean implements SourceBean {
client
.
disconnect
();
}
}
private
static
CompletableFuture
<
byte
[]>
getOAuthToken
()
{
return
CompletableFuture
.
supplyAsync
(()
->
{
try
{
for
(
int
i
=
0
;
i
<
5
;
i
++)
{
TimeUnit
.
SECONDS
.
sleep
(
1
);
System
.
out
.
println
(
"OAuth server is slow to respond ..."
);
}
}
catch
(
final
InterruptedException
e
)
{
e
.
printStackTrace
();
}
return
new
byte
[]{
1
,
2
,
3
};
});
}
}
This diff is collapsed.
Click to expand it.
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment
Menu
Projects
Groups
Snippets
Help