Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
小 白蛋
Mqtt Cluster
Commits
0ec87021
Commit
0ec87021
authored
3 years ago
by
luxurong
Browse files
Options
Download
Email Patches
Plain Diff
pom
parent
fe6c858f
Changes
12
Hide whitespace changes
Inline
Side-by-side
Showing
12 changed files
smqtt-common/src/main/java/io/github/quickmsg/common/config/ConfigCheck.java
+23
-0
...in/java/io/github/quickmsg/common/config/ConfigCheck.java
smqtt-common/src/main/java/io/github/quickmsg/common/context/ContextHolder.java
+21
-0
...java/io/github/quickmsg/common/context/ContextHolder.java
smqtt-common/src/main/java/io/github/quickmsg/common/metric/MetricManager.java
+11
-7
.../java/io/github/quickmsg/common/metric/MetricManager.java
smqtt-core/src/main/java/io/github/quickmsg/core/DefaultTransport.java
+7
-6
...c/main/java/io/github/quickmsg/core/DefaultTransport.java
smqtt-core/src/main/java/io/github/quickmsg/core/http/AbstractHttpActor.java
+7
-1
.../java/io/github/quickmsg/core/http/AbstractHttpActor.java
smqtt-core/src/main/java/io/github/quickmsg/core/http/actors/ClusterActor.java
+2
-1
...ava/io/github/quickmsg/core/http/actors/ClusterActor.java
smqtt-core/src/main/java/io/github/quickmsg/core/http/actors/ConnectionActor.java
+2
-1
.../io/github/quickmsg/core/http/actors/ConnectionActor.java
smqtt-core/src/main/java/io/github/quickmsg/core/http/actors/IsClusterActor.java
+2
-1
...a/io/github/quickmsg/core/http/actors/IsClusterActor.java
smqtt-core/src/main/java/io/github/quickmsg/core/http/actors/SubscribeActor.java
+2
-1
...a/io/github/quickmsg/core/http/actors/SubscribeActor.java
smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java
+6
-4
.../io/github/quickmsg/core/mqtt/AbstractReceiveContext.java
smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java
+1
-1
.../java/io/github/quickmsg/core/mqtt/MqttConfiguration.java
smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiver.java
+0
-1
.../main/java/io/github/quickmsg/core/mqtt/MqttReceiver.java
with
84 additions
and
24 deletions
+84
-24
smqtt-common/src/main/java/io/github/quickmsg/common/config/ConfigCheck.java
0 → 100644
+
23
-
0
View file @
0ec87021
package
io.github.quickmsg.common.config
;
import
io.github.quickmsg.common.metric.MeterType
;
import
java.util.Objects
;
/**
* @author luxurong
*/
public
class
ConfigCheck
{
public
static
void
checkMeterConfig
(
BootstrapConfig
.
MeterConfig
config
)
{
if
(
config
!=
null
)
{
if
(
config
.
getMeterType
()
==
MeterType
.
INFLUXDB
)
{
if
(
config
.
getInfluxdb
()
!=
null
)
{
Objects
.
requireNonNull
(
config
.
getInfluxdb
().
getUri
());
Objects
.
requireNonNull
(
config
.
getInfluxdb
().
getDb
());
}
}
}
}
}
This diff is collapsed.
Click to expand it.
smqtt-common/src/main/java/io/github/quickmsg/common/context/ContextHolder.java
0 → 100644
+
21
-
0
View file @
0ec87021
package
io.github.quickmsg.common.context
;
import
io.github.quickmsg.common.config.Configuration
;
/**
* @author luxurong
*/
public
class
ContextHolder
{
private
static
ReceiveContext
<?>
context
;
public
static
void
setReceiveContext
(
ReceiveContext
<?>
context
){
ContextHolder
.
context
=
context
;
}
public
static
ReceiveContext
<?>
getReceiveContext
(){
return
ContextHolder
.
context
;
}
}
This diff is collapsed.
Click to expand it.
smqtt-common/src/main/java/io/github/quickmsg/common/metric/MetricManager.java
+
11
-
7
View file @
0ec87021
...
...
@@ -2,7 +2,9 @@ package io.github.quickmsg.common.metric;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
io.github.quickmsg.common.config.BootstrapConfig
;
import
io.github.quickmsg.common.context.ContextHolder
;
import
io.github.quickmsg.common.utils.FormatUtils
;
import
io.netty.handler.traffic.TrafficCounter
;
import
oshi.SystemInfo
;
import
oshi.hardware.CentralProcessor
;
import
oshi.hardware.HardwareAbstractionLayer
;
...
...
@@ -86,19 +88,21 @@ public interface MetricManager {
metrics
.
put
(
"iowait"
,
new
DecimalFormat
(
"#.##%"
).
format
(
iowait
*
1.0
/
totalCpu
));
//cpu当前使用率
metrics
.
put
(
"idle"
,
new
DecimalFormat
(
"#.##%"
).
format
(
1.0
-
(
idle
*
1.0
/
totalCpu
)));
//cpu核数
metrics
.
put
(
"cpuNum"
,
Runtime
.
getRuntime
().
availableProcessors
());
return
metrics
;
}
default
Map
<
String
,
Object
>
getCounterMetric
()
{
Map
<
String
,
Object
>
metrics
=
new
HashMap
<>();
metrics
.
put
(
"connect_size"
,
getMetricRegistry
().
getMetricCounter
(
CounterType
.
CONNECT
).
getCounter
());
metrics
.
put
(
"topic_size"
,
getMetricRegistry
().
getMetricCounter
(
CounterType
.
CONNECT
).
getCounter
());
metrics
.
put
(
"read_size"
,
FormatUtils
.
formatByte
(((
WindowCounter
)
getMetricRegistry
().
getMetricCounter
(
CounterType
.
READ
)).
getAllCount
()));
metrics
.
put
(
"read_hour_size"
,
FormatUtils
.
formatByte
(((
WindowCounter
)
getMetricRegistry
().
getMetricCounter
(
CounterType
.
READ
)).
getWindowCount
()));
metrics
.
put
(
"write_size"
,
FormatUtils
.
formatByte
(((
WindowCounter
)
getMetricRegistry
().
getMetricCounter
(
CounterType
.
WRITE
)).
getAllCount
()));
metrics
.
put
(
"write_hour_size"
,
FormatUtils
.
formatByte
(((
WindowCounter
)
getMetricRegistry
().
getMetricCounter
(
CounterType
.
WRITE
)).
getWindowCount
()));
metrics
.
put
(
"subscribe_size"
,
getMetricRegistry
().
getMetricCounter
(
CounterType
.
SUBSCRIBE
).
getCounter
());
metrics
.
put
(
"publish_size"
,
getMetricRegistry
().
getMetricCounter
(
CounterType
.
PUBLISH
).
getCounter
());
metrics
.
put
(
"disconnect_size"
,
getMetricRegistry
().
getMetricCounter
(
CounterType
.
DIS_CONNECT
).
getCounter
());
metrics
.
put
(
"un_subscribe_size"
,
getMetricRegistry
().
getMetricCounter
(
CounterType
.
UN_SUBSCRIBE
).
getCounter
());
TrafficCounter
trafficCounter
=
ContextHolder
.
getReceiveContext
().
getTrafficHandlerLoader
().
get
().
trafficCounter
();
metrics
.
put
(
"read_size"
,
FormatUtils
.
formatByte
(
trafficCounter
.
cumulativeReadBytes
()
));
metrics
.
put
(
"read_second_size"
,
FormatUtils
.
formatByte
(
trafficCounter
.
currentReadBytes
()));
metrics
.
put
(
"write_size"
,
FormatUtils
.
formatByte
(
trafficCounter
.
cumulativeWrittenBytes
()));
metrics
.
put
(
"write_second_size"
,
FormatUtils
.
formatByte
(
trafficCounter
.
currentWrittenBytes
()));
return
metrics
;
}
...
...
This diff is collapsed.
Click to expand it.
smqtt-core/src/main/java/io/github/quickmsg/core/DefaultTransport.java
+
7
-
6
View file @
0ec87021
package
io.github.quickmsg.core
;
import
io.github.quickmsg.common.Receiver
;
import
io.github.quickmsg.common.context.ContextHolder
;
import
io.github.quickmsg.common.context.ReceiveContext
;
import
io.github.quickmsg.common.transport.Transport
;
import
io.github.quickmsg.core.mqtt.MqttConfiguration
;
...
...
@@ -26,8 +27,6 @@ public class DefaultTransport implements Transport<MqttConfiguration> {
private
DisposableServer
disposableServer
;
public
volatile
static
ReceiveContext
<
MqttConfiguration
>
receiveContext
;
public
DefaultTransport
(
MqttConfiguration
configuration
,
Receiver
receiver
)
{
this
.
configuration
=
configuration
;
this
.
receiver
=
receiver
;
...
...
@@ -38,7 +37,7 @@ public class DefaultTransport implements Transport<MqttConfiguration> {
@Override
public
Mono
<
Transport
>
start
()
{
return
Mono
.
deferContextual
(
contextView
->
receiver
.
bind
())
receiver
.
bind
())
.
doOnNext
(
this
::
bindSever
)
.
thenReturn
(
this
)
.
doOnSuccess
(
defaultTransport
->
log
.
info
(
"server start success host {} port {}"
,
disposableServer
.
host
(),
disposableServer
.
port
()))
...
...
@@ -48,12 +47,14 @@ public class DefaultTransport implements Transport<MqttConfiguration> {
@Override
@SuppressWarnings
(
"unchecked"
)
public
ReceiveContext
<
MqttConfiguration
>
buildReceiveContext
(
MqttConfiguration
mqttConfiguration
)
{
synchronized
(
this
)
{
if
(
DefaultTransport
.
receiveContext
==
null
)
{
DefaultTransport
.
receiveContext
=
new
MqttReceiveContext
(
mqttConfiguration
,
this
);
if
(
ContextHolder
.
getReceiveContext
()
==
null
)
{
MqttReceiveContext
receiveContext
=
new
MqttReceiveContext
(
mqttConfiguration
,
this
);
ContextHolder
.
setReceiveContext
(
receiveContext
);
}
return
DefaultTransport
.
r
eceiveContext
;
return
(
ReceiveContext
<
MqttConfiguration
>)
ContextHolder
.
getR
eceiveContext
()
;
}
}
...
...
This diff is collapsed.
Click to expand it.
smqtt-core/src/main/java/io/github/quickmsg/core/http/AbstractHttpActor.java
+
7
-
1
View file @
0ec87021
package
io.github.quickmsg.core.http
;
import
io.github.quickmsg.common.channel.MockMqttChannel
;
import
io.github.quickmsg.common.context.ContextHolder
;
import
io.github.quickmsg.common.message.SmqttMessage
;
import
io.github.quickmsg.common.http.HttpActor
;
import
io.github.quickmsg.core.DefaultTransport
;
...
...
@@ -17,6 +18,11 @@ public abstract class AbstractHttpActor implements HttpActor {
* @param mqttPublishMessage publish消息
*/
public
void
sendMqttMessage
(
MqttPublishMessage
mqttPublishMessage
)
{
DefaultTransport
.
receiveContext
.
getProtocolAdaptor
().
chooseProtocol
(
MockMqttChannel
.
DEFAULT_MOCK_CHANNEL
,
new
SmqttMessage
<>(
mqttPublishMessage
,
System
.
currentTimeMillis
(),
Boolean
.
FALSE
),
DefaultTransport
.
receiveContext
);
ContextHolder
.
getReceiveContext
()
.
getProtocolAdaptor
()
.
chooseProtocol
(
MockMqttChannel
.
DEFAULT_MOCK_CHANNEL
,
new
SmqttMessage
<>(
mqttPublishMessage
,
System
.
currentTimeMillis
(),
Boolean
.
FALSE
),
ContextHolder
.
getReceiveContext
());
}
}
This diff is collapsed.
Click to expand it.
smqtt-core/src/main/java/io/github/quickmsg/core/http/actors/ClusterActor.java
+
2
-
1
View file @
0ec87021
...
...
@@ -4,6 +4,7 @@ import io.github.quickmsg.common.annotation.AllowCors;
import
io.github.quickmsg.common.annotation.Header
;
import
io.github.quickmsg.common.annotation.Router
;
import
io.github.quickmsg.common.config.Configuration
;
import
io.github.quickmsg.common.context.ContextHolder
;
import
io.github.quickmsg.common.enums.HttpType
;
import
io.github.quickmsg.common.http.HttpActor
;
import
io.github.quickmsg.common.utils.JacksonUtil
;
...
...
@@ -28,6 +29,6 @@ public class ClusterActor implements HttpActor {
public
Publisher
<
Void
>
doRequest
(
HttpServerRequest
request
,
HttpServerResponse
response
,
Configuration
httpConfiguration
)
{
return
request
.
receive
()
.
then
(
response
.
sendString
(
Mono
.
just
(
JacksonUtil
.
bean2Json
(
DefaultTransport
.
r
eceiveContext
.
getClusterRegistry
().
getClusterNode
()))).
then
());
.
then
(
response
.
sendString
(
Mono
.
just
(
JacksonUtil
.
bean2Json
(
ContextHolder
.
getR
eceiveContext
()
.
getClusterRegistry
().
getClusterNode
()))).
then
());
}
}
This diff is collapsed.
Click to expand it.
smqtt-core/src/main/java/io/github/quickmsg/core/http/actors/ConnectionActor.java
+
2
-
1
View file @
0ec87021
...
...
@@ -4,6 +4,7 @@ import io.github.quickmsg.common.annotation.AllowCors;
import
io.github.quickmsg.common.annotation.Header
;
import
io.github.quickmsg.common.annotation.Router
;
import
io.github.quickmsg.common.config.Configuration
;
import
io.github.quickmsg.common.context.ContextHolder
;
import
io.github.quickmsg.common.enums.HttpType
;
import
io.github.quickmsg.common.utils.JacksonUtil
;
import
io.github.quickmsg.core.DefaultTransport
;
...
...
@@ -31,7 +32,7 @@ public class ConnectionActor extends AbstractHttpActor {
.
receive
()
.
then
(
response
.
sendString
(
Mono
.
just
(
JacksonUtil
.
bean2Json
(
DefaultTransport
.
r
eceiveContext
.
getChannelRegistry
().
getChannels
()
ContextHolder
.
getR
eceiveContext
()
.
getChannelRegistry
().
getChannels
()
.
stream
()
.
map
(
record
->
{
record
.
setAddress
(
record
.
getAddress
().
replaceAll
(
"/"
,
""
));
...
...
This diff is collapsed.
Click to expand it.
smqtt-core/src/main/java/io/github/quickmsg/core/http/actors/IsClusterActor.java
+
2
-
1
View file @
0ec87021
...
...
@@ -3,6 +3,7 @@ package io.github.quickmsg.core.http.actors;
import
io.github.quickmsg.common.annotation.AllowCors
;
import
io.github.quickmsg.common.annotation.Router
;
import
io.github.quickmsg.common.config.Configuration
;
import
io.github.quickmsg.common.context.ContextHolder
;
import
io.github.quickmsg.common.enums.HttpType
;
import
io.github.quickmsg.common.http.HttpActor
;
import
io.github.quickmsg.core.DefaultTransport
;
...
...
@@ -25,6 +26,6 @@ public class IsClusterActor implements HttpActor {
public
Publisher
<
Void
>
doRequest
(
HttpServerRequest
request
,
HttpServerResponse
response
,
Configuration
httpConfiguration
)
{
return
request
.
receive
()
.
then
(
response
.
sendString
(
Mono
.
just
(
String
.
valueOf
(
DefaultTransport
.
r
eceiveContext
.
getConfiguration
().
getClusterConfig
().
isEnable
()))).
then
());
.
then
(
response
.
sendString
(
Mono
.
just
(
String
.
valueOf
(
ContextHolder
.
getR
eceiveContext
()
.
getConfiguration
().
getClusterConfig
().
isEnable
()))).
then
());
}
}
This diff is collapsed.
Click to expand it.
smqtt-core/src/main/java/io/github/quickmsg/core/http/actors/SubscribeActor.java
+
2
-
1
View file @
0ec87021
...
...
@@ -4,6 +4,7 @@ import io.github.quickmsg.common.annotation.AllowCors;
import
io.github.quickmsg.common.annotation.Header
;
import
io.github.quickmsg.common.annotation.Router
;
import
io.github.quickmsg.common.config.Configuration
;
import
io.github.quickmsg.common.context.ContextHolder
;
import
io.github.quickmsg.common.enums.HttpType
;
import
io.github.quickmsg.common.http.HttpActor
;
import
io.github.quickmsg.common.utils.JacksonUtil
;
...
...
@@ -28,7 +29,7 @@ public class SubscribeActor implements HttpActor {
return
request
.
receive
()
.
then
(
response
.
sendString
(
Mono
.
just
(
JacksonUtil
.
bean2Json
(
DefaultTransport
.
r
eceiveContext
.
getTopicRegistry
().
getAllTopics
())))
.
sendString
(
Mono
.
just
(
JacksonUtil
.
bean2Json
(
ContextHolder
.
getR
eceiveContext
()
.
getTopicRegistry
().
getAllTopics
())))
.
then
());
}
}
This diff is collapsed.
Click to expand it.
smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java
+
6
-
4
View file @
0ec87021
...
...
@@ -6,6 +6,7 @@ import io.github.quickmsg.common.channel.traffic.TrafficHandlerLoader;
import
io.github.quickmsg.common.cluster.ClusterRegistry
;
import
io.github.quickmsg.common.config.AbstractConfiguration
;
import
io.github.quickmsg.common.config.BootstrapConfig
;
import
io.github.quickmsg.common.config.ConfigCheck
;
import
io.github.quickmsg.common.config.Configuration
;
import
io.github.quickmsg.common.context.ReceiveContext
;
import
io.github.quickmsg.common.enums.Event
;
...
...
@@ -96,17 +97,17 @@ public abstract class AbstractReceiveContext<T extends Configuration> implements
private
TrafficHandlerLoader
trafficHandlerLoader
()
{
if
(
configuration
.
getGlobalReadWriteSize
()
==
null
&&
configuration
.
getChannelReadWriteSize
()
==
null
)
{
return
new
CacheTrafficHandlerLoader
(
new
GlobalTrafficShapingHandler
(
this
.
loopResources
.
onServer
(
true
).
next
()));
return
new
CacheTrafficHandlerLoader
(
new
GlobalTrafficShapingHandler
(
this
.
loopResources
.
onServer
(
true
).
next
()
,
60
*
1000
));
}
else
if
(
configuration
.
getChannelReadWriteSize
()
==
null
)
{
String
[]
limits
=
configuration
.
getGlobalReadWriteSize
().
split
(
","
);
return
new
CacheTrafficHandlerLoader
(
new
GlobalTrafficShapingHandler
(
this
.
loopResources
.
onServer
(
true
),
Long
.
parseLong
(
limits
[
1
]),
Long
.
parseLong
(
limits
[
0
])));
return
new
CacheTrafficHandlerLoader
(
new
GlobalTrafficShapingHandler
(
this
.
loopResources
.
onServer
(
true
),
Long
.
parseLong
(
limits
[
1
]),
Long
.
parseLong
(
limits
[
0
])
,
60
*
1000
));
}
else
if
(
configuration
.
getGlobalReadWriteSize
()
==
null
)
{
String
[]
limits
=
configuration
.
getChannelReadWriteSize
().
split
(
","
);
return
new
LazyTrafficHandlerLoader
(()
->
new
GlobalTrafficShapingHandler
(
this
.
loopResources
.
onServer
(
true
),
Long
.
parseLong
(
limits
[
1
]),
Long
.
parseLong
(
limits
[
0
])));
return
new
LazyTrafficHandlerLoader
(()
->
new
GlobalTrafficShapingHandler
(
this
.
loopResources
.
onServer
(
true
),
Long
.
parseLong
(
limits
[
1
]),
Long
.
parseLong
(
limits
[
0
])
,
60
*
1000
));
}
else
{
String
[]
globalLimits
=
configuration
.
getGlobalReadWriteSize
().
split
(
","
);
String
[]
channelLimits
=
configuration
.
getChannelReadWriteSize
().
split
(
","
);
return
new
CacheTrafficHandlerLoader
(
new
GlobalChannelTrafficShapingHandler
(
this
.
loopResources
.
onServer
(
true
),
Long
.
parseLong
(
globalLimits
[
1
]),
Long
.
parseLong
(
globalLimits
[
0
]),
Long
.
parseLong
(
channelLimits
[
1
]),
Long
.
parseLong
(
channelLimits
[
0
])));
return
new
CacheTrafficHandlerLoader
(
new
GlobalChannelTrafficShapingHandler
(
this
.
loopResources
.
onServer
(
true
),
Long
.
parseLong
(
globalLimits
[
1
]),
Long
.
parseLong
(
globalLimits
[
0
]),
Long
.
parseLong
(
channelLimits
[
1
]),
Long
.
parseLong
(
channelLimits
[
0
])
,
60
*
1000
));
}
}
...
...
@@ -142,6 +143,7 @@ public abstract class AbstractReceiveContext<T extends Configuration> implements
private
MetricManager
metricManager
(
BootstrapConfig
.
MeterConfig
meterConfig
)
{
ConfigCheck
.
checkMeterConfig
(
meterConfig
);
return
MetricManagerHolder
.
setMetricManager
(
Optional
.
ofNullable
(
meterConfig
).
map
(
config
->
{
switch
(
config
.
getMeterType
())
{
case
INFLUXDB:
...
...
This diff is collapsed.
Click to expand it.
smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java
+
1
-
1
View file @
0ec87021
...
...
@@ -62,6 +62,6 @@ public class MqttConfiguration extends AbstractSslHandler implements AbstractCon
private
Map
<
Object
,
Object
>
environmentMap
;
private
Integer
messageMaxSize
;
private
Integer
messageMaxSize
=
4194304
;
}
This diff is collapsed.
Click to expand it.
smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttReceiver.java
+
0
-
1
View file @
0ec87021
...
...
@@ -44,7 +44,6 @@ public class MqttReceiver extends AbstractSslHandler implements Receiver {
.
addHandler
(
MqttEncoder
.
INSTANCE
)
.
addHandler
(
new
MqttDecoder
(
mqttConfiguration
.
getMessageMaxSize
()))
.
addHandler
(
receiveContext
.
getTrafficHandlerLoader
().
get
());
receiveContext
.
apply
(
MqttChannel
.
init
(
connection
));
});
}
...
...
This diff is collapsed.
Click to expand it.
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment