Commit bc67340f authored by luxurong's avatar luxurong
Browse files

Bootstrap update

parent 11b21954
Showing with 190 additions and 111 deletions
+190 -111
......@@ -55,7 +55,7 @@
</tags>
</to>
<container>
<mainClass>com.github.smqtt.container.DockerBootstrap</mainClass>
<mainClass>com.github.smqtt.docker.DockerStarter</mainClass>
</container>
</configuration>
<!-- <executions>-->
......@@ -85,7 +85,7 @@
<archive>
<manifest>
<!-- 此处指定main方法入口的class -->
<mainClass>com.github.smqtt.jar.JarBootstrap</mainClass>
<mainClass>com.github.smqtt.jar.JarStarter</mainClass>
</manifest>
</archive>
</configuration>
......
package com.github.smqtt;
import io.netty.channel.WriteBufferWaterMark;
import java.util.Optional;
import java.util.function.Function;
/**
* @author luxurong
* @date 2021/4/15 14:14
* @description
*/
public abstract class AbstractBootstrap {
private static final Integer DEFAULT_MQTT_PORT = 1883;
private static final Integer DEFAULT_WEBSOCKET_MQTT_PORT = 8999;
private static final String DEFAULT_AUTH_USERNAME_PASSWORD = "smqtt";
public static void bootstrap(Function<String, String> function) {
Integer port = Optional.ofNullable(function.apply(BootstrapKey.BOOTSTRAP_PORT))
.map(Integer::parseInt).orElse(DEFAULT_MQTT_PORT);
Integer lowWaterMark = Optional.ofNullable(function.apply(BootstrapKey.BOOTSTRAP_LOW_WATERMARK))
.map(Integer::parseInt).orElse(WriteBufferWaterMark.DEFAULT.low());
Integer highWaterMark = Optional.ofNullable(function.apply(BootstrapKey.BOOTSTRAP_HIGH_WATERMARK))
.map(Integer::parseInt).orElse(WriteBufferWaterMark.DEFAULT.high());
Boolean wiretap = Optional.ofNullable(function.apply(BootstrapKey.BOOTSTRAP_WIRETAP))
.map(Boolean::parseBoolean).orElse(false);
Integer bossThreadSize = Optional.ofNullable(function.apply(BootstrapKey.BOOTSTRAP_BOSS_THREAD_SIZE))
.map(Integer::parseInt).orElse(Runtime.getRuntime().availableProcessors() >> 1);
Integer workThreadSize = Optional.ofNullable(function.apply(BootstrapKey.BOOTSTRAP_WORK_THREAD_SIZE))
.map(Integer::parseInt).orElse(Runtime.getRuntime().availableProcessors());
Boolean isWebsocket = Optional.ofNullable(function.apply(BootstrapKey.BOOTSTRAP_WEB_SOCKET_ENABLE))
.map(Boolean::parseBoolean).orElse(false);
Boolean ssl = Optional.ofNullable(function.apply(BootstrapKey.BOOTSTRAP_SSL))
.map(Boolean::parseBoolean).orElse(false);
Integer websocketPort = 0;
if (isWebsocket) {
websocketPort = Optional.ofNullable(function.apply(BootstrapKey.BOOTSTRAP_WEB_SOCKET_PORT))
.map(Integer::parseInt).orElse(DEFAULT_WEBSOCKET_MQTT_PORT);
}
String username = Optional.ofNullable(function.apply(BootstrapKey.BOOTSTRAP_USERNAME))
.map(String::valueOf).orElse(DEFAULT_AUTH_USERNAME_PASSWORD);
String password = Optional.ofNullable(function.apply(BootstrapKey.BOOTSTRAP_PASSWORD))
.map(String::valueOf).orElse(DEFAULT_AUTH_USERNAME_PASSWORD);
Bootstrap.BootstrapBuilder builder = Bootstrap.builder();
builder.port(port)
.reactivePasswordAuth(((userName, passwordInBytes) -> userName.equals(username) && password.equals(new String(passwordInBytes))))
.bossThreadSize(bossThreadSize)
.wiretap(wiretap)
.ssl(ssl)
.workThreadSize(workThreadSize)
.lowWaterMark(lowWaterMark)
.highWaterMark(highWaterMark);
if (isWebsocket) {
builder.isWebsocket(true)
.websocketPort(websocketPort);
}
builder.build().startAwait();
}
}
package com.github.smqtt;
import com.github.smqtt.common.config.SslContext;
import com.github.smqtt.common.utils.PropertiesLoader;
import io.netty.channel.WriteBufferWaterMark;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
/**
* @author luxurong
* @date 2021/4/15 14:14
* @description
*/
public abstract class AbstractStarter {
private static final String DEFAULT_PROPERTIES_LOAD_CONFIG_PATH = "/conf/config.properties";
private static final Integer DEFAULT_MQTT_PORT = 1883;
private static final Integer DEFAULT_WEBSOCKET_MQTT_PORT = 8999;
private static final Integer DEFAULT_HTTP_PORT = 12000;
private static final String DEFAULT_AUTH_USERNAME_PASSWORD = "smqtt";
public static void start(Function<String, String> function) {
start(function, null);
}
public static void start(Function<String, String> function, String path) {
path = path == null ? DEFAULT_PROPERTIES_LOAD_CONFIG_PATH : path;
Map<String, String> params = PropertiesLoader.loadProperties(path);
Integer port = Optional.ofNullable(params.getOrDefault(BootstrapKey.BOOTSTRAP_PORT, function.apply(BootstrapKey.BOOTSTRAP_PORT)))
.map(Integer::parseInt).orElse(DEFAULT_MQTT_PORT);
Integer lowWaterMark = Optional.ofNullable(params.getOrDefault(BootstrapKey.BOOTSTRAP_LOW_WATERMARK, function.apply(BootstrapKey.BOOTSTRAP_LOW_WATERMARK)))
.map(Integer::parseInt).orElse(WriteBufferWaterMark.DEFAULT.low());
Integer highWaterMark = Optional.ofNullable(params.getOrDefault(BootstrapKey.BOOTSTRAP_HIGH_WATERMARK, function.apply(BootstrapKey.BOOTSTRAP_HIGH_WATERMARK)))
.map(Integer::parseInt).orElse(WriteBufferWaterMark.DEFAULT.high());
Boolean wiretap = Optional.ofNullable(params.getOrDefault(BootstrapKey.BOOTSTRAP_WIRETAP, function.apply(BootstrapKey.BOOTSTRAP_WIRETAP)))
.map(Boolean::parseBoolean).orElse(false);
Integer bossThreadSize = Optional.ofNullable(params.getOrDefault(BootstrapKey.BOOTSTRAP_BOSS_THREAD_SIZE, function.apply(BootstrapKey.BOOTSTRAP_BOSS_THREAD_SIZE)))
.map(Integer::parseInt).orElse(Runtime.getRuntime().availableProcessors() >> 1);
Integer workThreadSize = Optional.ofNullable(params.getOrDefault(BootstrapKey.BOOTSTRAP_WORK_THREAD_SIZE, function.apply(BootstrapKey.BOOTSTRAP_WORK_THREAD_SIZE)))
.map(Integer::parseInt).orElse(Runtime.getRuntime().availableProcessors());
Boolean isWebsocket = Optional.ofNullable(params.getOrDefault(BootstrapKey.BOOTSTRAP_WEB_SOCKET_ENABLE, function.apply(BootstrapKey.BOOTSTRAP_WEB_SOCKET_ENABLE)))
.map(Boolean::parseBoolean).orElse(false);
Boolean ssl = Optional.ofNullable(params.getOrDefault(BootstrapKey.BOOTSTRAP_SSL, function.apply(BootstrapKey.BOOTSTRAP_SSL)))
.map(Boolean::parseBoolean).orElse(false);
String username = Optional.ofNullable(params.getOrDefault(BootstrapKey.BOOTSTRAP_USERNAME, function.apply(BootstrapKey.BOOTSTRAP_USERNAME)))
.map(String::valueOf).orElse(DEFAULT_AUTH_USERNAME_PASSWORD);
String password = Optional.ofNullable(params.getOrDefault(BootstrapKey.BOOTSTRAP_PASSWORD, function.apply(BootstrapKey.BOOTSTRAP_PASSWORD)))
.map(String::valueOf).orElse(DEFAULT_AUTH_USERNAME_PASSWORD);
Boolean httpEnable = Optional.ofNullable(params.getOrDefault(BootstrapKey.BOOTSTRAP_HTTP_ENABLE, function.apply(BootstrapKey.BOOTSTRAP_HTTP_ENABLE)))
.map(Boolean::parseBoolean).orElse(false);
Bootstrap.BootstrapBuilder builder = Bootstrap.builder();
builder.port(port)
.reactivePasswordAuth(((userName, passwordInBytes) -> userName.equals(username) && password.equals(new String(passwordInBytes))))
.bossThreadSize(bossThreadSize)
.wiretap(wiretap)
.ssl(ssl)
.workThreadSize(workThreadSize)
.lowWaterMark(lowWaterMark)
.highWaterMark(highWaterMark);
if (ssl) {
String sslCrt = Optional.ofNullable(params.getOrDefault(BootstrapKey.BOOTSTRAP_SSL_CRT, function.apply(BootstrapKey.BOOTSTRAP_SSL_CRT)))
.map(String::valueOf).orElse(null);
String sslKey = Optional.ofNullable(params.getOrDefault(BootstrapKey.BOOTSTRAP_SSL_KEY, function.apply(BootstrapKey.BOOTSTRAP_SSL_KEY)))
.map(String::valueOf).orElse(null);
if (sslCrt == null || sslKey == null) {
builder.sslContext(null);
} else {
builder.sslContext(new SslContext(sslCrt, sslKey));
}
}
if (isWebsocket) {
Integer websocketPort = Optional.ofNullable(params.getOrDefault(BootstrapKey.BOOTSTRAP_WEB_SOCKET_PORT, function.apply(BootstrapKey.BOOTSTRAP_WEB_SOCKET_PORT)))
.map(Integer::parseInt).orElse(DEFAULT_WEBSOCKET_MQTT_PORT);
builder.isWebsocket(true)
.websocketPort(websocketPort);
}
if (httpEnable) {
Bootstrap.HttpOptions.HttpOptionsBuilder optionsBuilder = Bootstrap.HttpOptions.builder();
Integer httpPort = Optional.ofNullable(params.getOrDefault(BootstrapKey.BOOTSTRAP_HTTP_PORT, function.apply(BootstrapKey.BOOTSTRAP_HTTP_PORT)))
.map(Integer::parseInt).orElse(DEFAULT_HTTP_PORT);
Boolean accessLog = Optional.ofNullable(params.getOrDefault(BootstrapKey.BOOTSTRAP_HTTP_ACCESS_LOG, function.apply(BootstrapKey.BOOTSTRAP_HTTP_ACCESS_LOG)))
.map(Boolean::parseBoolean).orElse(false);
Boolean httpSsl = Optional.ofNullable(params.getOrDefault(BootstrapKey.BOOTSTRAP_HTTP_SSL_ENABLE, function.apply(BootstrapKey.BOOTSTRAP_HTTP_SSL_ENABLE)))
.map(Boolean::parseBoolean).orElse(false);
if (httpSsl) {
String httpSslCrt = Optional.ofNullable(params.getOrDefault(BootstrapKey.BOOTSTRAP_HTTP_SSL_CRT, function.apply(BootstrapKey.BOOTSTRAP_HTTP_SSL_CRT)))
.map(String::valueOf).orElse(null);
String httpSslKey = Optional.ofNullable(params.getOrDefault(BootstrapKey.BOOTSTRAP_HTTP_SSL_KEY, function.apply(BootstrapKey.BOOTSTRAP_HTTP_SSL_KEY)))
.map(String::valueOf).orElse(null);
if (httpSslKey == null || httpSslCrt == null) {
optionsBuilder.sslContext(null);
} else {
optionsBuilder.sslContext(new SslContext(httpSslCrt, httpSslKey));
}
}
optionsBuilder.httpPort(httpPort)
.accessLog(accessLog)
.ssl(httpSsl);
Bootstrap.HttpOptions options = optionsBuilder.build();
builder.httpOptions(options);
}
builder.build().startAwait();
}
}
......@@ -172,7 +172,6 @@ public class Bootstrap {
private HttpConfiguration buildHttpConfiguration() {
HttpConfiguration httpConfiguration = new HttpConfiguration();
Optional.ofNullable(this.httpOptions.wiretap).ifPresent(httpConfiguration::setWiretap);
Optional.ofNullable(this.httpOptions.accessLog).ifPresent(httpConfiguration::setAccessLog);
Optional.ofNullable(this.httpOptions.sslContext).ifPresent(httpConfiguration::setSslContext);
Optional.ofNullable(this.httpOptions.httpPort).ifPresent(httpConfiguration::setPort);
......@@ -191,7 +190,7 @@ public class Bootstrap {
private Integer httpPort = 0;
@Builder.Default
private Boolean wiretap = false;
private Boolean ssl = false;
private SslContext sslContext;
......
......@@ -40,6 +40,8 @@ public class BootstrapKey {
public final static String BOOTSTRAP_HTTP_ACCESS_LOG = "smqtt.http.accesslog";
public final static String BOOTSTRAP_HTTP_SSL_ENABLE = "smqtt.http.ssl.enable";
public final static String BOOTSTRAP_HTTP_SSL_CRT = "smqtt.http.ssl.crt";
public final static String BOOTSTRAP_HTTP_SSL_KEY = "smqtt.http.ssl.key";
......
package com.github.smqtt.container;
import com.github.smqtt.AbstractBootstrap;
/**
* @author luxurong
* @date 2021/4/14 20:40
* @description
*/
public class DockerBootstrap extends AbstractBootstrap {
public static void main(String[] args) {
bootstrap(System::getenv);
}
}
package com.github.smqtt.docker;
import com.github.smqtt.AbstractStarter;
import lombok.extern.slf4j.Slf4j;
/**
* @author luxurong
* @date 2021/4/14 20:40
* @description
*/
@Slf4j
public class DockerStarter extends AbstractStarter {
private static final String CONFIG_MAPPER_DIR = "/conf/config.properties";
public static void main(String[] args) {
log.info("DockerStarter start args {}", String.join(",", args));
start(System::getenv, CONFIG_MAPPER_DIR);
}
}
package com.github.smqtt.jar;
import com.github.smqtt.AbstractBootstrap;
/**
* @author luxurong
* @date 2021/4/14 20:39
* @description
*/
public class JarBootstrap extends AbstractBootstrap {
public static void main(String[] args) {
bootstrap(System::getProperty);
}
}
package com.github.smqtt.jar;
import com.github.smqtt.AbstractStarter;
import lombok.extern.slf4j.Slf4j;
/**
* @author luxurong
* @date 2021/4/14 20:39
* @description
*/
@Slf4j
public class JarStarter extends AbstractStarter {
public static void main(String[] args) {
log.info("JarStarter start args {}", String.join(",", args));
if (args.length > 0) {
start(System::getProperty, args[0]);
} else {
start(System::getProperty, args[0]);
} }
}
package com.github.smqtt.common.utils;
import lombok.extern.slf4j.Slf4j;
import java.io.*;
import java.util.Enumeration;
import java.util.HashMap;
......@@ -11,6 +13,7 @@ import java.util.Properties;
* @date 2021/4/22 18:36
* @description
*/
@Slf4j
public class PropertiesLoader {
......@@ -21,7 +24,6 @@ public class PropertiesLoader {
InputStream inputStream = new BufferedInputStream(new FileInputStream(new File(filePath)));
prop.load(inputStream);
} catch (IOException e) {
e.printStackTrace();
}
return map;
}
......
package com.github.smqtt.core.http;
import com.github.smqtt.common.Receiver;
import com.github.smqtt.core.ssl.AbstractSslHandler;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.handler.codec.json.JsonObjectDecoder;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
......@@ -13,15 +13,18 @@ import reactor.netty.http.server.HttpServer;
* @date 2021/4/19 15:07
* @description
*/
public class HttpReceiver implements Receiver {
public class HttpReceiver extends AbstractSslHandler implements Receiver {
@Override
public Mono<DisposableServer> bind() {
return Mono.deferContextual(contextView -> {
HttpConfiguration configuration = contextView.get(HttpConfiguration.class);
return HttpServer
.create().port(configuration.getPort())
HttpServer httpServer = HttpServer.create();
if (configuration.getSsl()) {
httpServer.secure(sslContextSpec -> this.secure(sslContextSpec, configuration));
}
return httpServer.port(configuration.getPort())
.route(new HttpRouterAcceptor())
.accessLog(configuration.getAccessLog())
.childOption(ChannelOption.TCP_NODELAY, true)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment