Commit a7ef7b2c authored by 向旭's avatar 向旭 Committed by 源三
Browse files

clientoff executor to single thread worker

Showing with 140 additions and 87 deletions
+140 -87
......@@ -64,7 +64,7 @@ public abstract class BaseInfo implements Serializable, StoreData<String> {
private long clientRegisterTimestamp;
private Map<String, String> attributes;
private volatile Map<String, String> attributes;
/** ClientVersion Enum */
public enum ClientVersion {
......@@ -202,11 +202,20 @@ public abstract class BaseInfo implements Serializable, StoreData<String> {
*
* @return property value of attributes
*/
public synchronized Map<String, String> getAttributes() {
if (attributes == null) {
public Map<String, String> getAttributes() {
Map<String, String> attrs = this.attributes;
if (attrs == null) {
return Collections.emptyMap();
}
return new HashMap<>(attributes);
return new HashMap<>(attrs);
}
public String attributeOf(String key) {
Map<String, String> attrs = this.attributes;
if (attrs == null) {
return null;
}
return attrs.get(key);
}
/**
......
......@@ -21,11 +21,10 @@ import com.alipay.sofa.registry.common.model.constants.ValueConstants;
import com.alipay.sofa.registry.core.model.ScopeEnum;
import com.alipay.sofa.registry.util.StringFormatter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.springframework.util.CollectionUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.springframework.util.CollectionUtils;
/**
* @author shangyu.wh
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.registry.util;
import com.google.common.collect.Sets;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class AtomicSet<T> {
private Set<T> data = Sets.newConcurrentHashSet();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock rlock = lock.readLock();
private final ReentrantReadWriteLock.WriteLock wlock = lock.writeLock();
public void add(T t) {
rlock.lock();
try {
data.add(t);
} finally {
rlock.unlock();
}
}
public Set<T> getAndReset() {
wlock.lock();
try {
Set<T> ret = data;
data = Sets.newConcurrentHashSet();
return ret;
} finally {
wlock.unlock();
}
}
}
......@@ -16,11 +16,11 @@
*/
package com.alipay.sofa.registry.util;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public abstract class WakeUpLoopRunnable extends LoopRunnable {
private final SynchronousQueue<Object> bell = new SynchronousQueue<>();
private final ArrayBlockingQueue<Object> bell = new ArrayBlockingQueue<>(1);
@Override
public void waitingUnthrowable() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.registry.util;
import org.junit.Assert;
import org.junit.Test;
public class AtomicSetTest {
@Test
public void test() {
AtomicSet<String> set = new AtomicSet<>();
set.add("1234");
set.add("1234");
Assert.assertEquals(1, set.getAndReset().size());
Assert.assertEquals(0, set.getAndReset().size());
}
}
......@@ -45,7 +45,6 @@ public class ExecutorManager {
private final ThreadPoolExecutor accessSubExecutor;
private final ThreadPoolExecutor dataChangeRequestExecutor;
private final ThreadPoolExecutor dataSlotSyncRequestExecutor;
private final ThreadPoolExecutor connectClientExecutor;
private final ThreadPoolExecutor accessMetadataExecutor;
private final ThreadPoolExecutor consoleExecutor;
......@@ -154,19 +153,6 @@ public class ExecutorManager {
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(sessionServerConfig.getAccessMetadataMaxBufferSize()),
new NamedThreadFactory(ACCESS_METADATA_EXECUTOR, true)));
connectClientExecutor =
reportExecutors.computeIfAbsent(
CONNECT_CLIENT_EXECUTOR,
k ->
new MetricsableThreadPoolExecutor(
CONNECT_CLIENT_EXECUTOR,
sessionServerConfig.getConnectClientExecutorPoolSize(),
sessionServerConfig.getConnectClientExecutorPoolSize(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue(
sessionServerConfig.getConnectClientExecutorQueueSize()),
new NamedThreadFactory(CONNECT_CLIENT_EXECUTOR, true)));
consoleExecutor =
reportExecutors.computeIfAbsent(
......@@ -192,8 +178,6 @@ public class ExecutorManager {
dataChangeRequestExecutor.shutdown();
dataSlotSyncRequestExecutor.shutdown();
connectClientExecutor.shutdown();
}
public Map<String, ThreadPoolExecutor> getReportExecutors() {
......@@ -216,10 +200,6 @@ public class ExecutorManager {
return dataSlotSyncRequestExecutor;
}
public ThreadPoolExecutor getConnectClientExecutor() {
return connectClientExecutor;
}
public ThreadPoolExecutor getAccessMetadataExecutor() {
return accessMetadataExecutor;
}
......
......@@ -103,10 +103,6 @@ public interface SessionServerConfig extends ServerShareConfig {
int getPushDataTaskDebouncingMillis();
int getConnectClientExecutorPoolSize();
int getConnectClientExecutorQueueSize();
int getDataChangeFetchTaskWorkerSize();
int getSubscriberRegisterTaskWorkerSize();
......
......@@ -76,10 +76,6 @@ public class SessionServerConfigBean implements SessionServerConfig {
private int dataChangeExecutorQueueSize = 20000;
private int connectClientExecutorPoolSize = OsUtils.getCpuCount();
private int connectClientExecutorQueueSize = 2000;
private int dataChangeFetchTaskWorkerSize = OsUtils.getCpuCount() * 6;
private int subscriberRegisterTaskWorkerSize = OsUtils.getCpuCount() * 4;
......@@ -624,44 +620,6 @@ public class SessionServerConfigBean implements SessionServerConfig {
this.pushTaskExecutorQueueSize = pushTaskExecutorQueueSize;
}
/**
* Getter method for property <tt>connectClientExecutorPoolSize</tt>.
*
* @return property value of connectClientExecutorPoolSize
*/
public int getConnectClientExecutorPoolSize() {
return connectClientExecutorPoolSize;
}
/**
* Setter method for property <tt>connectClientExecutorPoolSize</tt>.
*
* @param connectClientExecutorPoolSize value to be assigned to property
* connectClientExecutorMinPoolSize
*/
public void setConnectClientExecutorPoolSize(int connectClientExecutorPoolSize) {
this.connectClientExecutorPoolSize = connectClientExecutorPoolSize;
}
/**
* Getter method for property <tt>connectClientExecutorQueueSize</tt>.
*
* @return property value of connectClientExecutorQueueSize
*/
public int getConnectClientExecutorQueueSize() {
return connectClientExecutorQueueSize;
}
/**
* Setter method for property <tt>connectClientExecutorQueueSize</tt>.
*
* @param connectClientExecutorQueueSize value to be assigned to property
* connectClientExecutorQueueSize
*/
public void setConnectClientExecutorQueueSize(int connectClientExecutorQueueSize) {
this.connectClientExecutorQueueSize = connectClientExecutorQueueSize;
}
/**
* Getter method for property <tt>dataChangeFetchTaskWorkerSize</tt>.
*
......
......@@ -24,20 +24,40 @@ import com.alipay.sofa.registry.remoting.Channel;
import com.alipay.sofa.registry.server.session.bootstrap.ExecutorManager;
import com.alipay.sofa.registry.server.session.registry.Registry;
import com.alipay.sofa.registry.server.shared.remoting.ListenServerChannelHandler;
import java.util.Collections;
import com.alipay.sofa.registry.util.AtomicSet;
import com.alipay.sofa.registry.util.ConcurrentUtils;
import com.alipay.sofa.registry.util.WakeUpLoopRunnable;
import com.google.common.collect.Lists;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.util.CollectionUtils;
/**
* @author shangyu.wh
* @version $Id: ServerConnectionLisener.java, v 0.1 2017-11-30 15:04 shangyu.wh Exp $
*/
public class ClientNodeConnectionHandler extends ListenServerChannelHandler {
public class ClientNodeConnectionHandler extends ListenServerChannelHandler
implements ApplicationListener<ContextRefreshedEvent> {
private final Logger LOG = LoggerFactory.getLogger("SRV-CONNECT");
@Autowired Registry sessionRegistry;
@Autowired ExecutorManager executorManager;
private final AtomicSet<ConnectId> pendingClientOff = new AtomicSet<>();
private final ClientOffWorker worker = new ClientOffWorker();
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
start();
}
public void start() {
ConcurrentUtils.createDaemonThread("ClientOff-Worker", worker).start();
}
@Override
public void disconnected(Channel channel) {
super.disconnected(channel);
......@@ -49,17 +69,24 @@ public class ClientNodeConnectionHandler extends ListenServerChannelHandler {
return Node.NodeType.CLIENT;
}
void clean(Channel channel) {
try {
ConnectId connectId = ConnectId.of(channel.getRemoteAddress(), channel.getLocalAddress());
sessionRegistry.clean(Collections.singletonList(connectId));
} catch (Throwable e) {
LOG.safeError("clean connection failed:", e);
}
void fireCancelClient(Channel channel) {
pendingClientOff.add(ConnectId.of(channel.getRemoteAddress(), channel.getLocalAddress()));
worker.wakeup();
}
private void fireCancelClient(Channel channel) {
// avoid block connect ConnectionEventExecutor thread pool
executorManager.getConnectClientExecutor().execute(() -> clean(channel));
private class ClientOffWorker extends WakeUpLoopRunnable {
@Override
public void runUnthrowable() {
Set<ConnectId> connectIds = pendingClientOff.getAndReset();
if (!CollectionUtils.isEmpty(connectIds)) {
LOG.info("disconnect count={}", connectIds.size());
sessionRegistry.clean(Lists.newArrayList(connectIds));
}
}
@Override
public int getWaitingMillis() {
return 5000;
}
}
}
......@@ -25,6 +25,8 @@ import com.alipay.sofa.registry.remoting.ChannelHandler;
import com.alipay.sofa.registry.server.session.TestUtils;
import com.alipay.sofa.registry.server.session.bootstrap.ExecutorManager;
import com.alipay.sofa.registry.server.session.registry.Registry;
import com.alipay.sofa.registry.util.ConcurrentUtils;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
......@@ -46,7 +48,9 @@ public class ClientNodeConnectionHandlerTest {
handler.sessionRegistry = mock(Registry.class);
handler.executorManager = new ExecutorManager(TestUtils.newSessionConfig("testDc"));
Channel channel = TestUtils.newChannel(9600, "127.0.0.1", 9888);
handler.clean(channel);
handler.start();
handler.fireCancelClient(channel);
ConcurrentUtils.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
verify(handler.sessionRegistry, times(1)).clean(anyList());
handler.disconnected(channel);
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment