Commit 7e642791 authored by haozi's avatar haozi
Browse files

线程池指令下发agent

parent 4c69be8a
Showing with 204 additions and 0 deletions
+204 -0
package com.cubic.agent.cmd.jvm.threadpool;
/**
* 线程池下发命令接收对象
*
* @author zhanghao
* @date 2021/4/134:58 下午
*/
public class ThreadPoolCommandBody {
private String key;
private String name;
private Object arg;
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Object getArg() {
return arg;
}
public void setArg(Object arg) {
this.arg = arg;
}
}
package com.cubic.agent.cmd.jvm.threadpool;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
/**
* 命令修改线程池参数
*
* @author zhanghao
* @date 2021/4/59:46 上午
*/
public enum ThreadPoolCommandItems {
/**
* 修改核心线程数
*/
CORE_POOL_SIZE("setCorePoolSize", (t, value) -> t.setCorePoolSize((Integer) value)),
/**
* 修改存活时间
*/
KEEP_ALIVE_TIME("setKeepAliveTime", (t, v) -> t.setKeepAliveTime((Long) v, TimeUnit.MILLISECONDS)),
/**
* 修改最大线程数
*/
MAXIMUM_POOL_SIZE("setMaximumPoolSize", (t, v) -> t.setMaximumPoolSize((Integer) v)),
/**
* 修改拒绝策略
*/
REJECTED_EXECUTION_HANDLER("setRejectedExecutionHandler", (t, v) -> {
for (RejectedExecutionHandler rejectedExecutionHandler : RejectedExecution.HANDLERS) {
if (rejectedExecutionHandler.getClass().getName().equals(v.toString())) {
t.setRejectedExecutionHandler(rejectedExecutionHandler);
return;
}
}
new IllegalArgumentException("not found `RejectedExecutionHandler` for [" + v + "]");
}),
/**
* 修改创建线程工厂
*/
THREAD_FACTORY("setThreadFactory", (t, v) -> {
throw new UnsupportedOperationException("Unsupported Sets the thread factory");
});
// 方法名
private String methodName;
// 赋值函数
private BiConsumer<ThreadPoolExecutor, Object> func;
ThreadPoolCommandItems(String methodName, BiConsumer<ThreadPoolExecutor, Object> func) {
this.methodName = methodName;
this.func = func;
}
/**
* 修改数据参数
*
* @param tpe 线程池
* @param methodName 方法名
* @param arg 参数
*/
public static void setItem(ThreadPoolExecutor tpe, String methodName, Object arg) {
for (ThreadPoolCommandItems item : values()) {
if (item.methodName.equals(methodName)) {
item.func.accept(tpe, arg);
return;
}
}
}
/**
* 线程池拒绝策略
*/
static class RejectedExecution {
/**
* 策略
*/
private final static RejectedExecutionHandler[] HANDLERS = {
/**
* 丢弃任务并抛出RejectedExecutionException异常
*/
new ThreadPoolExecutor.AbortPolicy(),
/**
* 由调用线程(提交任务的线程)处理该任务
*/
new ThreadPoolExecutor.CallerRunsPolicy(),
/**
* 丢弃队列最前面的任务,然后重新提交被拒绝的任务
*/
new ThreadPoolExecutor.DiscardOldestPolicy(),
/**
* 丢弃任务,但是不抛出异常
*/
new ThreadPoolExecutor.DiscardPolicy()
};
}
}
package com.cubic.agent.cmd.jvm.threadpool;
import com.cubic.agent.boot.ServiceManager;
import com.cubic.agent.process.Processor;
import com.cubic.agent.remote.CommandCode;
import com.google.common.collect.ImmutableList;
import com.google.gson.Gson;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 处理线程池下发参数
*
* @author zhanghao
* @date 2021/4/1311:23 上午
*/
public class ThreadPoolProcessor implements Processor {
private static final Logger log = LoggerFactory.getLogger(ThreadPoolProcessor.class);
private final ThreadPoolService service;
@Override
public List<Integer> types() {
return ImmutableList.of(CommandCode.JVM_THREAD_POOL.getCode());
}
public ThreadPoolProcessor() {
service = ServiceManager.INSTANCE.findService(ThreadPoolService.class);
if (service == null) {
log.warn("ServiceManager not find service `ThreadPoolMonitorService.class`");
}
}
@Override
public void process(ChannelHandlerContext ctx, String id, String command, String body) throws ClassNotFoundException {
if (service == null) {
log.error("service is null");
return;
}
Gson gson = new Gson();
ThreadPoolCommandBody threadPoolCommandBody = gson.fromJson(body, ThreadPoolCommandBody.class);
String key = threadPoolCommandBody.getKey();
String methodName = threadPoolCommandBody.getName();
Object arg = threadPoolCommandBody.getArg();
if (key == null || methodName == null || arg == null) {
log.error("ThreadPoolProcessor command is null, key [{}], name [{}], arg [{}]", key, threadPoolCommandBody.getName(), threadPoolCommandBody.getArg());
}
ThreadPoolExecutor monitorThreadPool = service.getMonitor(threadPoolCommandBody.getKey());
if (monitorThreadPool == null) {
log.warn("thread pool is null, key [{}]", key);
}
ThreadPoolCommandItems.setItem(monitorThreadPool, methodName, arg);
}
}
......@@ -3,3 +3,4 @@ com.cubic.agent.process.HeartbeatProcessor
com.cubic.agent.process.RegisterProcessor
com.cubic.agent.process.ArthasProcessor
com.cubic.agent.process.ThreadDumpProcessor
com.cubic.agent.cmd.jvm.threadpool.ThreadPoolProcessor
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment