Commit 8a48e3ff authored by 昱恒's avatar 昱恒
Browse files

bolt channel attribute

parent aba1d1b1
Showing with 187 additions and 153 deletions
+187 -153
......@@ -16,9 +16,6 @@
*/
package com.alipay.sofa.registry.common.model.metaserver;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.google.common.base.Objects;
import java.util.Map;
/**
......@@ -67,11 +64,15 @@ public class ClientManagerAddress {
public static class AddressVersion {
private long version;
private final String address;
private String address;
private final boolean pub = true;
/** true:持久化关流pub false:临时关流pub */
private boolean pub = true;
private final boolean sub;
/** true:持久化关流sub false:临时关流sub */
private boolean sub;
public AddressVersion() {}
public AddressVersion(String address, boolean sub) {
this.address = address;
......@@ -112,12 +113,17 @@ public class ClientManagerAddress {
@Override
public String toString() {
return "AddressVersion{" +
"version=" + version +
", address='" + address + '\'' +
", pub=" + pub +
", sub=" + sub +
'}';
return "AddressVersion{"
+ "version="
+ version
+ ", address='"
+ address
+ '\''
+ ", pub="
+ pub
+ ", sub="
+ sub
+ '}';
}
}
}
......@@ -17,14 +17,11 @@
package com.alipay.sofa.registry.remoting.bolt;
import com.alipay.remoting.AsyncContext;
import com.alipay.remoting.BizContext;
import com.alipay.remoting.Connection;
import com.alipay.remoting.InvokeContext;
import com.alipay.sofa.registry.exception.SofaRegistryRuntimeException;
import com.alipay.sofa.registry.remoting.Channel;
import com.alipay.sofa.registry.util.StringFormatter;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import javax.ws.rs.client.WebTarget;
/**
......@@ -38,11 +35,10 @@ public class BoltChannel implements Channel {
private AsyncContext asyncContext;
private BizContext bizContext;
private Map<String, Object> attributes;
public BoltChannel(Connection conn) {
if (conn == null) {
throw new SofaRegistryRuntimeException("conn is null.");
}
this.connection = conn;
}
......@@ -65,18 +61,16 @@ public class BoltChannel implements Channel {
@Override
public synchronized Object getAttribute(String key) {
return attributes == null ? null : attributes.get(key);
return connection == null ? null : connection.getAttribute(key);
}
@Override
public synchronized void setAttribute(String key, Object value) {
if (attributes == null) {
attributes = new HashMap<>();
}
if (value == null) {
attributes.remove(key);
connection.removeAttribute(key);
} else {
attributes.put(key, value);
connection.setAttribute(key, value);
}
}
......@@ -90,13 +84,6 @@ public class BoltChannel implements Channel {
this.connection.close();
}
public InvokeContext getInvokeContext() {
if (bizContext == null) {
return null;
}
return bizContext.getInvokeContext();
}
/**
* Getter method for property <tt>connection</tt>.
*
......@@ -124,24 +111,6 @@ public class BoltChannel implements Channel {
this.asyncContext = asyncContext;
}
/**
* Getter method for property <tt>bizContext</tt>.
*
* @return property value of bizContext
*/
public BizContext getBizContext() {
return bizContext;
}
/**
* Setter method for property <tt>bizContext</tt>.
*
* @param bizContext value to be assigned to property bizContext
*/
public void setBizContext(BizContext bizContext) {
this.bizContext = bizContext;
}
public void markProtobuf() {
if (!markProtobuf) {
this.markProtobuf = true;
......
......@@ -38,22 +38,19 @@ public final class BoltUtil {
public static Byte getBoltCustomSerializer(Channel channel) {
if (channel instanceof BoltChannel) {
BoltChannel boltChannel = (BoltChannel) channel;
InvokeContext invokeContext = boltChannel.getBizContext().getInvokeContext();
if (null != invokeContext) {
// set client custom codec for request command if not null
Object clientCustomCodec = invokeContext.get(InvokeContext.BOLT_CUSTOM_SERIALIZER);
if (null != clientCustomCodec) {
try {
return (Byte) clientCustomCodec;
} catch (ClassCastException e) {
throw new IllegalArgumentException(
"Illegal custom codec ["
+ clientCustomCodec
+ "], the type of value should be [byte], but now is ["
+ clientCustomCodec.getClass().getName()
+ "].");
}
// set client custom codec for request command if not null
Object clientCustomCodec = boltChannel.getAttribute(InvokeContext.BOLT_CUSTOM_SERIALIZER);
if (null != clientCustomCodec) {
try {
return (Byte) clientCustomCodec;
} catch (ClassCastException e) {
throw new IllegalArgumentException(
"Illegal custom codec ["
+ clientCustomCodec
+ "], the type of value should be [byte], but now is ["
+ clientCustomCodec.getClass().getName()
+ "].");
}
}
}
......
......@@ -41,7 +41,6 @@ public class SyncUserProcessorAdapter extends SyncUserProcessor {
@Override
public Object handleRequest(BizContext bizCtx, Object request) throws Exception {
BoltChannel boltChannel = new BoltChannel(bizCtx.getConnection());
boltChannel.setBizContext(bizCtx);
return userProcessorHandler.reply(boltChannel, request);
}
......
......@@ -41,8 +41,5 @@ public class AsyncUserProcessorAdapterTest {
TestUtils.assertException(
RuntimeException.class, () -> adapter.handleRequest(exceptionContext, null, "test"));
BizContext context = Mockito.mock(BizContext.class);
adapter.handleRequest(context, null, "test");
Mockito.verify(handler, Mockito.times(1)).received(Mockito.anyObject(), Mockito.anyObject());
}
}
......@@ -22,9 +22,10 @@ import com.alipay.sofa.registry.remoting.ChannelHandler;
import com.alipay.sofa.registry.remoting.exchange.RequestChannelClosedException;
import com.alipay.sofa.registry.remoting.exchange.RequestException;
import com.google.common.collect.Lists;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
......@@ -37,25 +38,19 @@ public class BoltUtilTest {
@Test
public void testGetBoltCustomSerializer() {
Assert.assertNull(BoltUtil.getBoltCustomSerializer(new MockChannel()));
BoltChannel boltChannel = new BoltChannel(null);
InvokeContext invokeContext = new InvokeContext();
invokeContext.put(InvokeContext.BOLT_CUSTOM_SERIALIZER, new Object());
RemotingContext remotingContext =
new RemotingContext(
new MockChannelHandlerContext(), invokeContext, false, new ConcurrentHashMap<>());
BizContext bizContext = new DefaultBizContext(remotingContext);
boltChannel.setBizContext(bizContext);
boolean isException = false;
try {
BoltUtil.getBoltCustomSerializer(boltChannel);
} catch (Throwable r) {
isException = true;
}
Assert.assertTrue(isException);
invokeContext.put(InvokeContext.BOLT_CUSTOM_SERIALIZER, new Byte("3"));
BoltChannel boltChannel = new BoltChannel(new Connection(createChn()));
boltChannel.setAttribute(InvokeContext.BOLT_CUSTOM_SERIALIZER, new Byte("3"));
Assert.assertEquals(new Byte("3"), BoltUtil.getBoltCustomSerializer(boltChannel));
}
private static io.netty.channel.Channel createChn() {
io.netty.channel.Channel chn = Mockito.mock(io.netty.channel.Channel.class);
Mockito.when(chn.attr(Mockito.any(AttributeKey.class)))
.thenReturn(Mockito.mock(Attribute.class));
return chn;
}
@Test
public void testBase() {
Connection conn = Mockito.mock(Connection.class);
......@@ -65,11 +60,6 @@ public class BoltUtilTest {
boltChannel.setAsyncContext(asyncContext);
Assert.assertTrue(asyncContext == boltChannel.getAsyncContext());
BizContext bizContext = Mockito.mock(BizContext.class);
boltChannel.setBizContext(bizContext);
Assert.assertTrue(bizContext == boltChannel.getBizContext());
Assert.assertNull(boltChannel.getWebTarget());
boltChannel.close();
Mockito.verify(conn, Mockito.times(1)).close();
......
......@@ -17,9 +17,17 @@
package com.alipay.sofa.registry.remoting.bolt;
import com.alipay.remoting.BizContext;
import com.alipay.remoting.Connection;
import com.alipay.remoting.DefaultBizContext;
import com.alipay.remoting.RemotingContext;
import com.alipay.sofa.registry.exception.SofaRegistryRuntimeException;
import com.alipay.sofa.registry.remoting.ChannelHandler;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import io.netty.channel.Channel;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
......@@ -40,9 +48,7 @@ public class SyncUserProcessorAdapterTest {
Mockito.when(exceptionContext.getConnection()).thenThrow(new RuntimeException());
TestUtils.assertRunException(
RuntimeException.class, () -> adapter.handleRequest(exceptionContext, "test"));
BizContext context = Mockito.mock(BizContext.class);
adapter.handleRequest(context, "test");
Mockito.verify(handler, Mockito.times(1)).reply(Mockito.anyObject(), Mockito.anyObject());
}
}
......@@ -61,6 +61,8 @@ CREATE TABLE `client_manager_address` (
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`operation` varchar(128) NOT NULL COMMENT '操作类型',
`pub` tinyint(1) NOT NULL DEFAULT 1 COMMENT '是否持久化关流pub',
`sub` tinyint(1) NOT NULL DEFAULT 1 COMMENT '是否持久化关流sub',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_data_center_address` (`data_center`, `address`)
);
......
......@@ -60,6 +60,8 @@ CREATE TABLE `client_manager_address` (
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`operation` varchar(128) NOT NULL COMMENT '操作类型',
`pub` tinyint(1) NOT NULL DEFAULT 1 COMMENT '是否持久化关流pub',
`sub` tinyint(1) NOT NULL DEFAULT 1 COMMENT '是否持久化关流sub',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_data_center_address` (`data_center`, `address`) BLOCK_SIZE 16384 GLOBAL
) DEFAULT CHARSET = utf8mb4 ROW_FORMAT = DYNAMIC COMPRESSION = 'zstd_1.0' REPLICA_NUM = 3 BLOCK_SIZE = 16384 USE_BLOOM_FILTER = FALSE TABLET_SIZE = 134217728 PCTFREE = 10 COMMENT = '关流量pod数据表'
......
......@@ -18,7 +18,6 @@ package com.alipay.sofa.registry.server.meta.provide.data;
import com.alipay.sofa.registry.common.model.metaserver.ClientManagerAddress;
import com.alipay.sofa.registry.common.model.metaserver.ClientManagerAddress.AddressVersion;
import com.alipay.sofa.registry.common.model.metaserver.ProvideData;
import com.alipay.sofa.registry.store.api.DBResponse;
import java.util.Set;
......@@ -44,7 +43,7 @@ public interface ClientManagerService {
*/
boolean clientOff(Set<String> ipSet);
boolean clientOffNew(Set<AddressVersion> address);
boolean clientOffWithSub(Set<AddressVersion> address);
/**
* query client off ips
......
......@@ -27,17 +27,17 @@ import com.alipay.sofa.registry.store.api.meta.ClientManagerAddressRepository;
import com.alipay.sofa.registry.util.ConcurrentUtils;
import com.alipay.sofa.registry.util.LoopRunnable;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.collections.CollectionUtils;
import org.glassfish.jersey.internal.guava.Sets;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.glassfish.jersey.internal.guava.Sets;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
/**
* @author xiaojian.xj
......@@ -65,7 +65,7 @@ public class DefaultClientManagerService
@Override
public boolean clientOpen(Set<String> ipSet) {
Set<AddressVersion> addressSet = buildDefaultAddressVersions(ipSet);
if (addressSet == null) {
if (CollectionUtils.isEmpty(addressSet)) {
return false;
}
return clientManagerAddressRepository.clientOpen(addressSet);
......@@ -80,7 +80,7 @@ public class DefaultClientManagerService
@Override
public boolean clientOff(Set<String> ipSet) {
Set<AddressVersion> addressSet = buildDefaultAddressVersions(ipSet);
if (addressSet == null) {
if (CollectionUtils.isEmpty(addressSet)) {
return false;
}
return clientManagerAddressRepository.clientOff(addressSet);
......@@ -88,17 +88,17 @@ public class DefaultClientManagerService
private Set<AddressVersion> buildDefaultAddressVersions(Set<String> ipSet) {
if (CollectionUtils.isEmpty(ipSet)) {
return null;
return Collections.EMPTY_SET;
}
Set<AddressVersion> addressSet = Sets.newHashSetWithExpectedSize(ipSet.size());
for (String ip : ipSet) {
addressSet.add(new AddressVersion(ip, true));
addressSet.add(new AddressVersion(ip, true));
}
return addressSet;
}
@Override
public boolean clientOffNew(Set<AddressVersion> address) {
public boolean clientOffWithSub(Set<AddressVersion> address) {
return clientManagerAddressRepository.clientOff(address);
}
......@@ -114,7 +114,7 @@ public class DefaultClientManagerService
@Override
public boolean reduce(Set<String> ipSet) {
Set<AddressVersion> addressSet = buildDefaultAddressVersions(ipSet);
if (addressSet == null) {
if (CollectionUtils.isEmpty(addressSet)) {
return false;
}
return clientManagerAddressRepository.reduce(addressSet);
......
......@@ -29,9 +29,6 @@ import com.alipay.sofa.registry.store.api.OperationStatus;
import com.alipay.sofa.registry.util.JsonUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import javax.ws.rs.FormParam;
import javax.ws.rs.GET;
......@@ -57,9 +54,9 @@ public class ClientManagerResource {
LoggerFactory.getLogger(ClientManagerResource.class, "[DBService]");
public static final TypeReference<Set<AddressVersion>> FORMAT =
new TypeReference<Set<AddressVersion>>() {};
new TypeReference<Set<AddressVersion>>() {};
@Autowired private ClientManagerService clientManagerService;
@Autowired private ClientManagerService clientManagerService;
/** Client off */
@POST
......@@ -81,8 +78,8 @@ public class ClientManagerResource {
/** Client off */
@POST
@Path("/clientOffNew")
public CommonResponse clientOffNew(@FormParam("ips") String ips) {
@Path("/clientOffWithSub")
public CommonResponse clientOffWithSub(@FormParam("ips") String ips) {
if (StringUtils.isBlank(ips)) {
return CommonResponse.buildFailedResponse("ips is empty");
}
......@@ -93,7 +90,7 @@ public class ClientManagerResource {
return CommonResponse.buildFailedResponse("ips is invalidate");
}
boolean ret = clientManagerService.clientOffNew(read);
boolean ret = clientManagerService.clientOffWithSub(read);
DB_LOGGER.info("client off result:{}, ips:{}", ret, ips);
......
......@@ -31,6 +31,7 @@ import org.h2.tools.Server;
import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.springframework.beans.BeansException;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.ApplicationContext;
......@@ -184,5 +185,9 @@ public class AbstractH2DbTestBase extends AbstractTestBase implements Applicatio
}
@SpringBootApplication
public static class JdbcTestConfig {}
public static class JdbcTestConfig {
public static void main(String[] args) {
SpringApplication.run(JdbcTestConfig.class);
}
}
}
......@@ -20,13 +20,10 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.alipay.sofa.registry.common.model.Node;
import com.alipay.sofa.registry.common.model.ServerDataBox;
import com.alipay.sofa.registry.common.model.console.PersistenceData;
import com.alipay.sofa.registry.common.model.console.PersistenceDataBuilder;
import com.alipay.sofa.registry.common.model.constants.ValueConstants;
import com.alipay.sofa.registry.common.model.metaserver.ClientManagerAddress;
import com.alipay.sofa.registry.common.model.metaserver.ClientManagerAddress.AddressVersion;
import com.alipay.sofa.registry.common.model.metaserver.ProvideData;
import com.alipay.sofa.registry.common.model.metaserver.nodes.DataNode;
import com.alipay.sofa.registry.common.model.slot.Slot;
import com.alipay.sofa.registry.common.model.slot.SlotConfig;
......@@ -70,6 +67,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
import org.assertj.core.util.Lists;
import org.junit.Assert;
import org.junit.Before;
......@@ -701,14 +699,12 @@ public class AbstractMetaServerTestBase extends AbstractTestBase {
}
@Override
public DBResponse<ProvideData> queryClientOffSet() {
public boolean clientOffWithSub(Set<AddressVersion> address) {
version.incrementAndGet();
ProvideData provideData =
new ProvideData(
new ServerDataBox(cache.get()),
ValueConstants.CLIENT_OFF_ADDRESS_DATA_ID,
version.get());
return DBResponse.ok(provideData).build();
return cache
.get()
.addAll(address.stream().map(AddressVersion::getAddress).collect(Collectors.toSet()));
}
@Override
......@@ -717,7 +713,8 @@ public class AbstractMetaServerTestBase extends AbstractTestBase {
Maps.newHashMapWithExpectedSize(cache.get().size());
for (Object address : cache.get()) {
clientOffAddress.put(
(String) address, new AddressVersion(System.currentTimeMillis(), (String) address));
(String) address,
new AddressVersion(System.currentTimeMillis(), (String) address, true));
}
ClientManagerAddress resp = new ClientManagerAddress(version.get(), clientOffAddress);
......
......@@ -18,7 +18,8 @@ package com.alipay.sofa.registry.server.meta.provide.data;
import static org.mockito.Mockito.when;
import com.alipay.sofa.registry.common.model.metaserver.ProvideData;
import com.alipay.sofa.registry.common.model.metaserver.ClientManagerAddress;
import com.alipay.sofa.registry.common.model.metaserver.ClientManagerAddress.AddressVersion;
import com.alipay.sofa.registry.server.meta.AbstractH2DbTestBase;
import com.alipay.sofa.registry.server.meta.MetaLeaderService;
import com.alipay.sofa.registry.server.meta.bootstrap.config.MetaServerConfig;
......@@ -27,8 +28,10 @@ import com.alipay.sofa.registry.store.api.OperationStatus;
import com.alipay.sofa.registry.store.api.meta.ClientManagerAddressRepository;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import javax.annotation.Resource;
import org.junit.Assert;
......@@ -64,23 +67,78 @@ public class ClientManagerServiceTest extends AbstractH2DbTestBase {
clientManagerService.clientOff(clientOffSet);
Thread.sleep(2000);
DBResponse<ProvideData> clientOffResponse = clientManagerService.queryClientOffSet();
DBResponse<ClientManagerAddress> clientOffResponse =
clientManagerService.queryClientOffAddress();
Assert.assertEquals(clientOffResponse.getOperationStatus(), OperationStatus.SUCCESS);
ProvideData clientOffData = clientOffResponse.getEntity();
ClientManagerAddress clientOffData = clientOffResponse.getEntity();
Long v1 = clientOffData.getVersion();
Set<String> set1 = (Set<String>) clientOffData.getProvideData().getObject();
Assert.assertTrue(v1 > -1L);
Assert.assertEquals(clientOffSet, clientOffData.getClientOffAddress().keySet());
for (Entry<String, AddressVersion> entry : clientOffData.getClientOffAddress().entrySet()) {
Assert.assertTrue(entry.getValue().isPub());
Assert.assertTrue(entry.getValue().isSub());
}
Set<AddressVersion> address = Sets.newHashSet();
for (String s : clientOffSet) {
address.add(new AddressVersion(s, false));
}
clientManagerService.clientOffWithSub(address);
Thread.sleep(2000);
clientOffResponse = clientManagerService.queryClientOffAddress();
Assert.assertEquals(clientOffResponse.getOperationStatus(), OperationStatus.SUCCESS);
clientOffData = clientOffResponse.getEntity();
v1 = clientOffData.getVersion();
Assert.assertTrue(v1 > -1L);
Assert.assertEquals(clientOffSet, clientOffData.getClientOffAddress().keySet());
for (Entry<String, AddressVersion> entry : clientOffData.getClientOffAddress().entrySet()) {
Assert.assertTrue(entry.getValue().isPub());
Assert.assertTrue(entry.getValue().isSub());
}
address = Sets.newHashSet(new AddressVersion("4.4.4.4", false));
clientOffSet.add("4.4.4.4");
clientManagerService.clientOffWithSub(address);
Thread.sleep(2000);
clientOffResponse = clientManagerService.queryClientOffAddress();
Assert.assertEquals(clientOffResponse.getOperationStatus(), OperationStatus.SUCCESS);
clientOffData = clientOffResponse.getEntity();
v1 = clientOffData.getVersion();
Assert.assertTrue(v1 > -1L);
AddressVersion addressVersion = clientOffData.getClientOffAddress().get("4.4.4.4");
Assert.assertTrue(addressVersion.isPub());
Assert.assertFalse(addressVersion.isSub());
clientManagerService.clientOff(Collections.singleton("4.4.4.4"));
Thread.sleep(2000);
clientOffResponse = clientManagerService.queryClientOffAddress();
Assert.assertEquals(clientOffResponse.getOperationStatus(), OperationStatus.SUCCESS);
clientOffData = clientOffResponse.getEntity();
long pre = v1;
v1 = clientOffData.getVersion();
Assert.assertEquals(pre, v1.longValue());
Assert.assertTrue(v1 > -1L);
Assert.assertEquals(clientOffSet, set1);
addressVersion = clientOffData.getClientOffAddress().get("4.4.4.4");
Assert.assertTrue(addressVersion.isPub());
Assert.assertFalse(addressVersion.isSub());
clientManagerService.clientOpen(clientOpenSet);
Thread.sleep(2000);
DBResponse<ProvideData> clientOpenResponse = clientManagerService.queryClientOffSet();
DBResponse<ClientManagerAddress> clientOpenResponse =
clientManagerService.queryClientOffAddress();
Assert.assertEquals(clientOpenResponse.getOperationStatus(), OperationStatus.SUCCESS);
ProvideData clientOpenData = clientOpenResponse.getEntity();
ClientManagerAddress clientOpenData = clientOpenResponse.getEntity();
Long v2 = clientOpenData.getVersion();
Set<String> set2 = (Set<String>) clientOpenData.getProvideData().getObject();
Assert.assertTrue(v2 > v1);
Assert.assertEquals(Sets.difference(clientOffSet, clientOpenSet), set2);
Assert.assertEquals(
Sets.difference(clientOffSet, clientOpenSet),
clientOpenData.getClientOffAddress().keySet());
/** check expire before clean */
List<String> expireAddress = clientManagerAddressRepository.getExpireAddress(new Date(), 100);
......
......@@ -17,7 +17,6 @@
package com.alipay.sofa.registry.server.meta.remoting.handler;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
......@@ -27,13 +26,10 @@ import com.alipay.sofa.registry.common.model.constants.ValueConstants;
import com.alipay.sofa.registry.common.model.metaserver.FetchSystemPropertyRequest;
import com.alipay.sofa.registry.common.model.metaserver.FetchSystemPropertyResult;
import com.alipay.sofa.registry.server.meta.AbstractMetaServerTestBase;
import com.alipay.sofa.registry.server.meta.provide.data.ClientManagerService;
import com.alipay.sofa.registry.server.meta.provide.data.DefaultProvideDataNotifier;
import com.alipay.sofa.registry.server.meta.provide.data.ProvideDataService;
import com.alipay.sofa.registry.server.meta.resource.BlacklistDataResource;
import com.alipay.sofa.registry.server.meta.resource.ClientManagerResource;
import com.alipay.sofa.registry.test.TestUtils;
import java.util.Set;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
......@@ -56,11 +52,7 @@ public class FetchSystemPropertyRequestHandlerTest extends AbstractMetaServerTes
private DefaultProvideDataNotifier dataNotifier = mock(DefaultProvideDataNotifier.class);
private FetchSystemPropertyRequest newBlacklistRequest() {
return new FetchSystemPropertyRequest(ValueConstants.BLACK_LIST_DATA_ID, anyLong());
}
private FetchSystemPropertyRequest newClientOffRequest() {
return new FetchSystemPropertyRequest(ValueConstants.CLIENT_OFF_ADDRESS_DATA_ID, anyLong());
return new FetchSystemPropertyRequest(ValueConstants.BLACK_LIST_DATA_ID, 1L);
}
@Before
......
......@@ -39,6 +39,8 @@ public class ClientManagerResourceTest extends AbstractMetaServerTestBase {
private static final String CLIENT_OFF_STR = "1.1.1.1;2.2.2.2";
private static final String CLIENT_OPEN_STR = "2.2.2.2;3.3.3.3";
private static final String CLIENT_OFF_NEW_STR =
"[{\"address\":\"1.1.1.1\",\"pub\":true,\"sub\":false},{\"address\":\"2.2.2.2\",\"pub\":true,\"sub\":false}]";
@Before
public void beforeClientManagerResourceTest() {
......@@ -58,4 +60,20 @@ public class ClientManagerResourceTest extends AbstractMetaServerTestBase {
Assert.assertEquals(query.getData().getVersion(), 2L);
Assert.assertEquals(query.getData().getClientOffAddress().size(), 1);
}
@Test
public void testClientManagerNew() {
clientManagerResource.clientOffWithSub(CLIENT_OFF_NEW_STR);
clientManagerResource.clientOff(CLIENT_OFF_STR);
clientManagerResource.clientOpen(CLIENT_OPEN_STR);
GenericResponse<ClientManagerAddress> query = clientManagerResource.query();
Assert.assertTrue(query.isSuccess());
Assert.assertEquals(query.getData().getVersion(), 3L);
Assert.assertEquals(query.getData().getClientOffAddress().size(), 1);
}
}
......@@ -61,6 +61,8 @@ CREATE TABLE `client_manager_address` (
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`operation` varchar(128) NOT NULL COMMENT '操作类型',
`pub` tinyint(1) NOT NULL DEFAULT 1 COMMENT '是否持久化关流pub',
`sub` tinyint(1) NOT NULL DEFAULT 1 COMMENT '是否持久化关流sub',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_data_center_address` (`data_center`, `address`)
);
......
......@@ -60,6 +60,8 @@ CREATE TABLE `client_manager_address` (
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`operation` varchar(128) NOT NULL COMMENT '操作类型',
`pub` tinyint(1) NOT NULL DEFAULT 1 COMMENT '是否持久化关流pub',
`sub` tinyint(1) NOT NULL DEFAULT 1 COMMENT '是否持久化关流sub',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_data_center_address` (`data_center`, `address`) BLOCK_SIZE 16384 GLOBAL
) DEFAULT CHARSET = utf8mb4 ROW_FORMAT = DYNAMIC COMPRESSION = 'zstd_1.0' REPLICA_NUM = 3 BLOCK_SIZE = 16384 USE_BLOOM_FILTER = FALSE TABLET_SIZE = 134217728 PCTFREE = 10 COMMENT = '关流量pod数据表'
......
......@@ -16,7 +16,6 @@
*/
package com.alipay.sofa.registry.server.session.connections;
import com.alipay.remoting.InvokeContext;
import com.alipay.sofa.registry.common.model.ConnectId;
import com.alipay.sofa.registry.common.model.constants.ValueConstants;
import com.alipay.sofa.registry.net.NetUtil;
......@@ -89,10 +88,7 @@ public class ConnectionsService {
if (ipSet.contains(ip)) {
if (StringUtils.isNotBlank(key)) {
BoltChannel boltChannel = (BoltChannel) channel;
InvokeContext invokeContext = boltChannel.getInvokeContext();
if (null != invokeContext) {
invokeContext.put(key, value);
}
boltChannel.setAttribute(key, value);
}
connections.add(ConnectId.of(channel.getRemoteAddress(), channel.getLocalAddress()));
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment