Commit cb9e3e22 authored by sohudo's avatar sohudo
Browse files

Master版本更新为1.6版本

parent 1ab29040
Showing with 1107 additions and 1353 deletions
+1107 -1353
......@@ -2,9 +2,9 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.mycat.mycat</groupId>
<groupId>io.mycat</groupId>
<artifactId>Mycat-server</artifactId>
<version>2.0-dev</version>
<version>1.6.5-release</version>
<packaging>jar</packaging>
<name>Mycat-server</name>
<description>The project of Mycat-server</description>
......@@ -13,7 +13,7 @@
<properties>
<app.encoding>UTF-8</app.encoding>
<!-- maven.build.timestamp.format>yyyyMMdd</maven.build.timestamp.format>
<buildNumber>${maven.build.timestamp}</buildNumber -->
<buildNumber>${maven.build.timestamp}</buildNumber -->
<maven.build.timestamp.format>yyyy-MM-dd HH:mm:ss</maven.build.timestamp.format>
<version.template.file>version.txt.template</version.template.file>
<version.file>version.txt</version.file>
......@@ -43,26 +43,8 @@
</snapshotRepository>
</distributionManagement>
<dependencies>
<!-- test -->
<dependency>
<groupId>org.codehaus.jsr166-mirror</groupId>
<artifactId>jsr166y</artifactId>
<version>1.7.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<scope>test</scope>
</dependency>
<!-- driver -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.35</version>
</dependency>
<!-- <dependency> <groupId>com.google.guava</groupId> <artifactId>guava-parent</artifactId>
<version>18.0</version> </dependency> -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
......@@ -78,103 +60,177 @@
<artifactId>leveldb-api</artifactId>
<version>0.7</version>
</dependency>
<dependency>
<groupId>com.sequoiadb</groupId>
<artifactId>sequoiadb-driver</artifactId>
<version>1.12</version>
</dependency>
<!-- common -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
<version>19.0</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>3.0.0</version>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.26</version>
</dependency>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.2</version>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.35</version>
</dependency>
<!-- parse -->
<dependency>
<groupId>com.univocity</groupId>
<artifactId>univocity-parsers</artifactId>
<version>1.5.4</version>
<type>jar</type>
<groupId>net.sf.ehcache</groupId>
<artifactId>ehcache-core</artifactId>
<version>2.6.11</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.14</version>
<groupId>org.mapdb</groupId>
<artifactId>mapdb</artifactId>
<version>1.0.7</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.16</version>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity</artifactId>
<version>1.7</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.7</version>
<groupId>org.codehaus.jsr166-mirror</groupId>
<artifactId>jsr166y</artifactId>
<version>1.7.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity</artifactId>
<version>1.7</version>
<groupId>com.univocity</groupId>
<artifactId>univocity-parsers</artifactId>
<version>2.2.1</version>
<type>jar</type>
</dependency>
<!-- cache -->
<dependency>
<groupId>org.mapdb</groupId>
<artifactId>mapdb</artifactId>
<version>1.0.7</version>
<groupId>com.sequoiadb</groupId>
<artifactId>sequoiadb-driver</artifactId>
<version>1.12</version>
</dependency>
<!--DOM4J FOR XML -->
<dependency>
<groupId>net.sf.ehcache</groupId>
<artifactId>ehcache-core</artifactId>
<version>2.6.11</version>
<scope>compile</scope>
<groupId>dom4j</groupId>
<artifactId>dom4j</artifactId>
<version>1.6.1</version>
<exclusions>
<exclusion>
<groupId>xml-apis</groupId>
<artifactId>xml-apis</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- zookeeper -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.9.0</version>
<version>2.11.0</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.12</version>
<scope>compile</scope>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.3</version>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.11.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.3</version>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.12</version>
</dependency>
<!-- joda日期处理工具 -->
<!-- joda日期处理工具 -->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.8.2</version>
<version>2.9.3</version>
</dependency>
<dependency>
<groupId>com.github.shyiko</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.6.0</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.8.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>2.0.3</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
<version>2.10</version>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<version>1.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-lang/commons-lang -->
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.netty/netty-buffer -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>4.1.9.Final</version>
</dependency>
</dependencies>
......@@ -287,12 +343,12 @@
<goal>replace</goal>
</goals>
<configuration>
<file>${project.basedir}/src/main/java/io/mycat/server/Versions.template</file>
<outputFile>${project.basedir}/src/main/java/io/mycat/server/Versions.java</outputFile>
<file>${project.basedir}/src/main/java/io/mycat/config/Versions.template</file>
<outputFile>${project.basedir}/src/main/java/io/mycat/config/Versions.java</outputFile>
<replacements>
<replacement>
<token>@server-version@</token>
<value>5.5.8-mycat-${project.version}-${timestamp}</value>
<value>5.6.29-mycat-${project.version}-${timestamp}</value>
</replacement>
</replacements>
</configuration>
......@@ -305,11 +361,12 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<source>1.7</source>
<target>1.7</target>
<encoding>${app.encoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
......@@ -364,6 +421,7 @@
</additionalConfig>
</configuration>
</plugin>
<!-- -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
......@@ -391,15 +449,15 @@
<systemProperty>MYCAT_HOME=.</systemProperty>
</systemProperties>
<extraArguments>
<extraArgument>-server</extraArgument>
<extraArgument>-server </extraArgument>
<extraArgument>-XX:MaxPermSize=64M</extraArgument>
<extraArgument>-XX:+AggressiveOpts</extraArgument>
<extraArgument>-XX:MaxDirectMemorySize=2G</extraArgument>
<!-- 远程JMX -->
<extraArgument>-Dcom.sun.management.jmxremote</extraArgument>
<extraArgument>-Dcom.sun.management.jmxremote </extraArgument>
<extraArgument>-Dcom.sun.management.jmxremote.port=1984</extraArgument>
<extraArgument>-Dcom.sun.management.jmxremote.authenticate=false</extraArgument>
<extraArgument>-Dcom.sun.management.jmxremote.ssl=false</extraArgument>
<extraArgument>-Dcom.sun.management.jmxremote.authenticate=false </extraArgument>
<extraArgument>-Dcom.sun.management.jmxremote.ssl=false </extraArgument>
<extraArgument>-Xmx4G</extraArgument>
<extraArgument>-Xms1G</extraArgument>
</extraArguments>
......@@ -430,14 +488,22 @@
<name>configuration.directory.in.classpath.first</name>
<value>conf</value>
</property>
<property>
<name>wrapper.ping.timeout</name>
<value>120</value>
</property>
<property>
<name>wrapper.ping.timeout</name>
<value>120</value>
</property>
<property>
<name>set.default.REPO_DIR</name>
<value>lib</value>
</property>
<property>
<name>wrapper.logfile.maxsize</name>
<value>512m</value>
</property>
<property>
<name>wrapper.logfile.maxfiles</name>
<value>30</value>
</property>
<property>
<name>wrapper.logfile</name>
<value>logs/wrapper.log</value>
......
......@@ -37,7 +37,7 @@
<outputDirectory>mycat/conf</outputDirectory>
<excludes>
<exclude>*.dtd</exclude>
<exclude>log4j.*</exclude>
<exclude>log4j*</exclude>
</excludes>
</fileSet>
<fileSet>
......
......@@ -37,7 +37,7 @@
<outputDirectory>mycat/conf</outputDirectory>
<excludes>
<exclude>*.dtd</exclude>
<exclude>log4j.*</exclude>
<exclude>log4j*</exclude>
</excludes>
</fileSet>
<fileSet>
......
......@@ -37,7 +37,7 @@
<outputDirectory>mycat/conf</outputDirectory>
<excludes>
<exclude>*.dtd</exclude>
<exclude>log4j.*</exclude>
<exclude>log4j*</exclude>
</excludes>
</fileSet>
<fileSet>
......
......@@ -38,7 +38,7 @@
<outputDirectory>mycat/conf</outputDirectory>
<excludes>
<exclude>*.dtd</exclude>
<exclude>log4j.*</exclude>
<exclude>log4j*</exclude>
</excludes>
</fileSet>
<fileSet>
......
......@@ -37,7 +37,7 @@
<outputDirectory>mycat/conf</outputDirectory>
<excludes>
<exclude>*.dtd</exclude>
<exclude>log4j.*</exclude>
<exclude>log4j*</exclude>
</excludes>
</fileSet>
<fileSet>
......
REM check JAVA_HOME & java
set "JAVA_CMD="%JAVA_HOME%/bin/java""
if "%JAVA_HOME%" == "" goto noJavaHome
if exist "%JAVA_HOME%\bin\java.exe" goto mainEntry
:noJavaHome
echo ---------------------------------------------------
echo WARN: JAVA_HOME environment variable is not set.
echo ---------------------------------------------------
set "JAVA_CMD=java"
:mainEntry
REM set HOME_DIR
set "CURR_DIR=%cd%"
cd ..
set "MYCAT_HOME=%cd%"
cd %CURR_DIR%
"%JAVA_CMD%" -Xms256M -Xmx1G -XX:MaxPermSize=64M -DMYCAT_HOME=%MYCAT_HOME% -cp "..\conf;..\lib\*" demo.ZkCreate
\ No newline at end of file
#!/bin/bash
echo "check JAVA_HOME & java"
JAVA_CMD=$JAVA_HOME/bin/java
MAIN_CLASS=demo.ZkCreate
if [ ! -d "$JAVA_HOME" ]; then
echo ---------------------------------------------------
echo WARN: JAVA_HOME environment variable is not set.
echo ---------------------------------------------------
JAVA_CMD=java
fi
echo "---------set HOME_DIR------------"
CURR_DIR=`pwd`
cd ..
MYCAT_HOME=`pwd`
cd $CURR_DIR
$JAVA_CMD -Xms256M -Xmx1G -XX:MaxPermSize=64M -DMYCAT_HOME=$MYCAT_HOME -cp "$MYCAT_HOME/conf:$MYCAT_HOME/lib/*" $MAIN_CLASS
<?xml version="1.0" encoding="UTF-8"?>
<!--
- Copyright 1999-2012 Alibaba Group.
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-->
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="ConsoleAppender" class="org.apache.log4j.ConsoleAppender">
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{MM-dd HH:mm:ss.SSS} %5p [%t] (%F:%L) -%m%n" />
</layout>
</appender>
<appender name="FILE" class="org.apache.log4j.RollingFileAppender">
<param name="file" value="${MYCAT_HOME}/logs/mycat.log" />
<param name="Append" value="false"/>
<param name="MaxFileSize" value="1000KB"/>
<param name="MaxBackupIndex" value="10"/>
<param name="encoding" value="UTF-8" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{MM/dd HH:mm:ss.SSS} %5p [%t] (%F:%L) -%m%n" />
</layout>
</appender>
<root>
<level value="info" />
<appender-ref ref="FILE" />
<!--<appender-ref ref="FILE" />-->
</root>
</log4j:configuration>
\ No newline at end of file
#update
#Thu Sep 10 16:14:18 CST 2015
jdbchost=0
package demo.catlets;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
import com.alibaba.druid.sql.ast.expr.SQLIntegerExpr;
import com.alibaba.druid.sql.ast.statement.SQLInsertStatement.ValuesClause;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlInsertStatement;
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
import io.mycat.MycatServer;
import io.mycat.cache.LayerCachePool;
import io.mycat.route.RouteResultset;
import io.mycat.route.RouteResultsetNode;
import io.mycat.route.factory.RouteStrategyFactory;
import io.mycat.server.ErrorCode;
import io.mycat.server.MySQLFrontConnection;
import io.mycat.server.config.node.SchemaConfig;
import io.mycat.server.config.node.SystemConfig;
import io.mycat.server.config.node.TableConfig;
import io.mycat.server.parser.ServerParse;
import io.mycat.server.sequence.IncrSequenceMySQLHandler;
import io.mycat.server.sequence.IncrSequencePropHandler;
import io.mycat.server.sequence.SequenceHandler;
import io.mycat.sqlengine.Catlet;
import io.mycat.sqlengine.EngineCtx;
import io.mycat.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 执行批量插入sequence Id
* @author 兵临城下
* @date 2015/03/20
*/
public class BatchInsertSequence implements Catlet {
private static final Logger LOGGER = LoggerFactory
.getLogger(BatchInsertSequence.class);
private RouteResultset rrs;//路由结果集
private String executeSql;//接收执行处理任务的sql
private SequenceHandler sequenceHandler;//sequence处理对象
//重新路由使用
private SystemConfig sysConfig;
private SchemaConfig schema;
private int sqltype;
private String charset;
private MySQLFrontConnection sc;
private LayerCachePool cachePool;
@Override
public void processSQL(String sql, EngineCtx ctx) {
try {
getRoute(executeSql);
RouteResultsetNode[] nodes = rrs.getNodes();
if (nodes == null || nodes.length == 0 || nodes[0].getName() == null
|| nodes[0].getName().equals("")) {
ctx.getSession().getSource().writeErrMessage(ErrorCode.ER_NO_DB_ERROR,
"No dataNode found ,please check tables defined in schema:"
+ ctx.getSession().getSource().getSchema());
return;
}
sc.getSession2().execute(rrs, sqltype);//将路由好的数据执行入库
} catch (Exception e) {
LOGGER.error("BatchInsertSequence.processSQL(String sql, EngineCtx ctx)",e);
}
}
@Override
public void route(SystemConfig sysConfig, SchemaConfig schema, int sqlType,
String realSQL, String charset, MySQLFrontConnection sc,
LayerCachePool cachePool) {
int rs = ServerParse.parse(realSQL);
this.sqltype = rs & 0xff;
this.sysConfig=sysConfig;
this.schema=schema;
this.charset=charset;
this.sc=sc;
this.cachePool=cachePool;
try {
MySqlStatementParser parser = new MySqlStatementParser(realSQL);
SQLStatement statement = parser.parseStatement();
MySqlInsertStatement insert = (MySqlInsertStatement)statement;
if(insert.getValuesList()!=null){
String tableName = StringUtil.getTableName(realSQL).toUpperCase();
TableConfig tableConfig = schema.getTables().get(tableName);
String primaryKey = tableConfig.getPrimaryKey();//获得表的主键字段
SQLIdentifierExpr sqlIdentifierExpr = new SQLIdentifierExpr();
sqlIdentifierExpr.setName(primaryKey);
insert.getColumns().add(sqlIdentifierExpr);
if(sequenceHandler == null){
int seqHandlerType = MycatServer.getInstance().getConfig().getSystem().getSequnceHandlerType();
switch(seqHandlerType){
case SystemConfig.SEQUENCEHANDLER_MYSQLDB:
sequenceHandler = IncrSequenceMySQLHandler.getInstance();
break;
case SystemConfig.SEQUENCEHANDLER_LOCALFILE:
sequenceHandler = IncrSequencePropHandler.getInstance();
break;
default:
throw new java.lang.IllegalArgumentException("Invalid sequnce handler type "+seqHandlerType);
}
}
for(ValuesClause vc : insert.getValuesList()){
SQLIntegerExpr sqlIntegerExpr = new SQLIntegerExpr();
long value = sequenceHandler.nextId(tableName.toUpperCase());
sqlIntegerExpr.setNumber(value);//插入生成的sequence值
vc.addValue(sqlIntegerExpr);
}
String insertSql = insert.toString();
this.executeSql = insertSql;
}
} catch (Exception e) {
LOGGER.error("BatchInsertSequence.route(......)",e);
}
}
/**
* 根据sql获得路由执行结果
* @param sql
*/
private void getRoute(String sql){
try {
rrs =RouteStrategyFactory.getRouteStrategy().route(sysConfig, schema, sqltype,sql,charset, sc, cachePool);
} catch (Exception e) {
LOGGER.error("BatchInsertSequence.getRoute(String sql)",e);
}
}
}
package demo.catlets;
import io.mycat.cache.LayerCachePool;
import io.mycat.server.MySQLFrontConnection;
import io.mycat.server.config.node.SchemaConfig;
import io.mycat.server.config.node.SystemConfig;
import io.mycat.server.packet.RowDataPacket;
import io.mycat.sqlengine.AllJobFinishedListener;
import io.mycat.sqlengine.Catlet;
import io.mycat.sqlengine.EngineCtx;
import io.mycat.sqlengine.SQLJobHandler;
import io.mycat.util.ByteUtil;
import io.mycat.util.ResultSetUtil;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
public class MyHellowJoin implements Catlet {
public void processSQL(String sql, EngineCtx ctx) {
DirectDBJoinHandler joinHandler = new DirectDBJoinHandler(ctx);
String[] dataNodes = { "dn1", "dn2", "dn3" };
ctx.executeNativeSQLSequnceJob(dataNodes, sql, joinHandler);
ctx.setAllJobFinishedListener(new AllJobFinishedListener() {
@Override
public void onAllJobFinished(EngineCtx ctx) {
ctx.writeEof();
}
});
}
@Override
public void route(SystemConfig sysConfig, SchemaConfig schema, int sqlType,
String realSQL, String charset, MySQLFrontConnection sc,
LayerCachePool cachePool) {
}
}
class DirectDBJoinHandler implements SQLJobHandler {
private List<byte[]> fields;
private final EngineCtx ctx;
public DirectDBJoinHandler(EngineCtx ctx) {
super();
this.ctx = ctx;
}
private Map<String, byte[]> rows = new ConcurrentHashMap<String, byte[]>();
private ConcurrentLinkedQueue<String> ids = new ConcurrentLinkedQueue<String>();
@Override
public void onHeader(String dataNode, byte[] header, List<byte[]> fields) {
this.fields = fields;
}
private void createQryJob(int batchSize) {
int count = 0;
Map<String, byte[]> batchRows = new ConcurrentHashMap<String, byte[]>();
String theId = null;
StringBuilder sb = new StringBuilder().append('(');
while ((theId = ids.poll()) != null) {
batchRows.put(theId, rows.remove(theId));
sb.append(theId).append(',');
if (count++ > batchSize) {
break;
}
}
if (count == 0) {
return;
}
sb.deleteCharAt(sb.length() - 1).append(')');
String querySQL = "select b.id, b.title from hotnews b where id in "
+ sb;
ctx.executeNativeSQLParallJob(new String[] { "dn1", "dn2", "dn3" },
querySQL, new MyRowOutPutDataHandler(fields, ctx, batchRows));
}
@Override
public boolean onRowData(String dataNode, byte[] rowData) {
String id = ResultSetUtil.getColumnValAsString(rowData, fields, 0);
// 放入结果集
rows.put(id, rowData);
ids.offer(id);
int batchSize = 999;
// 满1000条,发送一个查询请求
if (ids.size() > batchSize) {
createQryJob(batchSize);
}
return false;
}
@Override
public void finished(String dataNode, boolean failed) {
if (!failed) {
createQryJob(Integer.MAX_VALUE);
}
// no more jobs
ctx.endJobInput();
}
}
class MyRowOutPutDataHandler implements SQLJobHandler {
private final List<byte[]> afields;
private List<byte[]> bfields;
private final EngineCtx ctx;
private final Map<String, byte[]> arows;
public MyRowOutPutDataHandler(List<byte[]> afields, EngineCtx ctx,
Map<String, byte[]> arows) {
super();
this.afields = afields;
this.ctx = ctx;
this.arows = arows;
}
@Override
public void onHeader(String dataNode, byte[] header, List<byte[]> bfields) {
this.bfields=bfields;
ctx.writeHeader(afields, bfields);
}
@Override
public boolean onRowData(String dataNode, byte[] rowData) {
RowDataPacket rowDataPkg = ResultSetUtil.parseRowData(rowData, bfields);
// 获取Id字段,
String id = ByteUtil.getString(rowDataPkg.fieldValues.get(0));
byte[] bname = rowDataPkg.fieldValues.get(1);
// 查找ID对应的A表的记录
byte[] arow = arows.remove(id);
rowDataPkg = ResultSetUtil.parseRowData(arow, afields);
// 设置b.name 字段
rowDataPkg.add(bname);
ctx.writeRow(rowDataPkg);
// EngineCtx.LOGGER.info("out put row ");
return false;
}
@Override
public void finished(String dataNode, boolean failed) {
}
}
package demo.catlets;
import io.mycat.cache.LayerCachePool;
import io.mycat.route.RouteResultset;
import io.mycat.route.RouteResultsetNode;
import io.mycat.route.factory.RouteStrategyFactory;
import io.mycat.server.ErrorCode;
import io.mycat.server.Fields;
import io.mycat.server.MySQLFrontConnection;
import io.mycat.server.config.node.SchemaConfig;
import io.mycat.server.config.node.SystemConfig;
import io.mycat.server.packet.FieldPacket;
import io.mycat.server.packet.RowDataPacket;
import io.mycat.server.parser.ServerParse;
import io.mycat.sqlengine.AllJobFinishedListener;
import io.mycat.sqlengine.Catlet;
import io.mycat.sqlengine.EngineCtx;
import io.mycat.sqlengine.SQLJobHandler;
import io.mycat.sqlengine.sharejoin.JoinParser;
import io.mycat.util.ByteUtil;
import io.mycat.util.ResultSetUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
//import org.opencloudb.route.RouteStrategy;
//import org.opencloudb.route.impl.DruidMysqlRouteStrategy;
//import org.opencloudb.parser.druid.DruidParser;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.statement.SQLSelectQuery;
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock;
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
/**
* 功能详细描述:分片join
* @author sohudo[http://blog.csdn.net/wind520]
* @create 2015年01月22日 下午6:50:23
* @version 0.0.1
*/
public class ShareJoin implements Catlet {
private EngineCtx ctx;
private RouteResultset rrs ;
private JoinParser joinParser;
private Map<String, byte[]> rows = new ConcurrentHashMap<String, byte[]>();
private Map<String,String> ids = new ConcurrentHashMap<String,String>();
//private ConcurrentLinkedQueue<String> ids = new ConcurrentLinkedQueue<String>();
private List<byte[]> fields; //主表的字段
private ArrayList<byte[]> allfields;//所有的字段
private boolean isMfield=false;
private int mjob=0;
private int maxjob=0;
private int joinindex=0;//关联join表字段的位置
private int sendField=0;
private boolean childRoute=false;
private boolean jointTableIsData=false;
// join 字段的类型,一般情况都是int, long; 增加该字段为了支持非int,long类型的(一般为varchar)joinkey的sharejoin
// 参见:io.mycat.server.packet.FieldPacket 属性: public int type;
// 参见:http://dev.mysql.com/doc/internals/en/com-query-response.html#packet-Protocol::ColumnDefinition
private int joinKeyType = Fields.FIELD_TYPE_LONG; // 默认 join 字段为int型
//重新路由使用
private SystemConfig sysConfig;
private SchemaConfig schema;
private int sqltype;
private String charset;
private MySQLFrontConnection sc;
private LayerCachePool cachePool;
public void setRoute(RouteResultset rrs){
this.rrs =rrs;
}
public void route(SystemConfig sysConfig, SchemaConfig schema,int sqlType, String realSQL, String charset, MySQLFrontConnection sc, LayerCachePool cachePool) {
int rs = ServerParse.parse(realSQL);
this.sqltype = rs & 0xff;
this.sysConfig=sysConfig;
this.schema=schema;
this.charset=charset;
this.sc=sc;
this.cachePool=cachePool;
try {
// RouteStrategy routes=RouteStrategyFactory.getRouteStrategy();
// rrs =RouteStrategyFactory.getRouteStrategy().route(sysConfig, schema, sqlType2, realSQL,charset, sc, cachePool);
MySqlStatementParser parser = new MySqlStatementParser(realSQL);
SQLStatement statement = parser.parseStatement();
if(statement instanceof SQLSelectStatement) {
SQLSelectStatement st=(SQLSelectStatement)statement;
SQLSelectQuery sqlSelectQuery =st.getSelect().getQuery();
if(sqlSelectQuery instanceof MySqlSelectQueryBlock) {
MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock)st.getSelect().getQuery();
joinParser=new JoinParser(mysqlSelectQuery,realSQL);
joinParser.parser();
}
}
/*
if (routes instanceof DruidMysqlRouteStrategy) {
SQLSelectStatement st=((DruidMysqlRouteStrategy) routes).getSQLStatement();
SQLSelectQuery sqlSelectQuery =st.getSelect().getQuery();
if(sqlSelectQuery instanceof MySqlSelectQueryBlock) {
MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock)st.getSelect().getQuery();
joinParser=new JoinParser(mysqlSelectQuery,realSQL);
joinParser.parser();
}
}
*/
} catch (Exception e) {
}
}
private void getRoute(String sql){
try {
if (joinParser!=null){
rrs =RouteStrategyFactory.getRouteStrategy().route(sysConfig, schema, sqltype,sql,charset, sc, cachePool);
}
} catch (Exception e) {
}
}
private String[] getDataNodes(){
String[] dataNodes =new String[rrs.getNodes().length] ;
for (int i=0;i<rrs.getNodes().length;i++){
dataNodes[i]=rrs.getNodes()[i].getName();
}
return dataNodes;
}
private String getDataNode(String[] dataNodes){
String dataNode="";
for (int i=0;i<dataNodes.length;i++){
dataNode+=dataNodes[i]+",";
}
return dataNode;
}
public void processSQL(String sql, EngineCtx ctx) {
String ssql=joinParser.getSql();
getRoute(ssql);
RouteResultsetNode[] nodes = rrs.getNodes();
if (nodes == null || nodes.length == 0 || nodes[0].getName() == null
|| nodes[0].getName().equals("")) {
ctx.getSession().getSource().writeErrMessage(ErrorCode.ER_NO_DB_ERROR,
"No dataNode found ,please check tables defined in schema:"
+ ctx.getSession().getSource().getSchema());
return;
}
this.ctx=ctx;
String[] dataNodes =getDataNodes();
maxjob=dataNodes.length;
ShareDBJoinHandler joinHandler = new ShareDBJoinHandler(this,joinParser.getJoinLkey());
ctx.executeNativeSQLSequnceJob(dataNodes, ssql, joinHandler);
EngineCtx.LOGGER.info("Catlet exec:"+getDataNode(getDataNodes())+" sql:" +ssql);
ctx.setAllJobFinishedListener(new AllJobFinishedListener() {
@Override
public void onAllJobFinished(EngineCtx ctx) {
if (!jointTableIsData) {
ctx.writeHeader(fields);
}
ctx.writeEof();
EngineCtx.LOGGER.info("发送数据OK");
}
});
}
public void putDBRow(String id,String nid, byte[] rowData,int findex){
rows.put(id, rowData);
ids.put(id, nid);
joinindex=findex;
//ids.offer(nid);
int batchSize = 999;
// 满1000条,发送一个查询请求
if (ids.size() > batchSize) {
createQryJob(batchSize);
}
}
public void putDBFields(List<byte[]> mFields){
if (!isMfield){
fields=mFields;
}
}
public void endJobInput(String dataNode, boolean failed){
mjob++;
if (mjob>=maxjob){
createQryJob(Integer.MAX_VALUE);
ctx.endJobInput();
}
// EngineCtx.LOGGER.info("完成"+mjob+":" + dataNode+" failed:"+failed);
}
//private void createQryJob(String dataNode,int batchSize) {
private void createQryJob(int batchSize) {
int count = 0;
Map<String, byte[]> batchRows = new ConcurrentHashMap<String, byte[]>();
String theId = null;
StringBuilder sb = new StringBuilder().append('(');
String svalue="";
for(Map.Entry<String,String> e: ids.entrySet() ){
theId=e.getKey();
batchRows.put(theId, rows.remove(theId));
if (!svalue.equals(e.getValue())){
if(joinKeyType == Fields.FIELD_TYPE_VAR_STRING
|| joinKeyType == Fields.FIELD_TYPE_STRING){ // joinkey 为varchar
sb.append("'").append(e.getValue()).append("'").append(','); // ('digdeep','yuanfang')
}else{ // 默认joinkey为int/long
sb.append(e.getValue()).append(','); // (1,2,3)
}
}
svalue=e.getValue();
if (count++ > batchSize) {
break;
}
}
/*
while ((theId = ids.poll()) != null) {
batchRows.put(theId, rows.remove(theId));
sb.append(theId).append(',');
if (count++ > batchSize) {
break;
}
}
*/
if (count == 0) {
return;
}
jointTableIsData=true;
sb.deleteCharAt(sb.length() - 1).append(')');
String sql = String.format(joinParser.getChildSQL(), sb);
//if (!childRoute){
getRoute(sql);
//childRoute=true;
//}
ctx.executeNativeSQLParallJob(getDataNodes(),sql, new ShareRowOutPutDataHandler(this,fields,joinindex,joinParser.getJoinRkey(), batchRows));
EngineCtx.LOGGER.info("SQLParallJob:"+getDataNode(getDataNodes())+" sql:" + sql);
}
public void writeHeader(String dataNode,List<byte[]> afields, List<byte[]> bfields) {
sendField++;
if (sendField==1){
ctx.writeHeader(afields, bfields);
setAllFields(afields, bfields);
// EngineCtx.LOGGER.info("发送字段2:" + dataNode);
}
}
private void setAllFields(List<byte[]> afields, List<byte[]> bfields){
allfields=new ArrayList<byte[]>();
for (byte[] field : afields) {
allfields.add(field);
}
//EngineCtx.LOGGER.info("所有字段2:" +allfields.size());
for (int i=1;i<bfields.size();i++){
allfields.add(bfields.get(i));
}
}
public List<byte[]> getAllFields(){
return allfields;
}
public void writeRow(RowDataPacket rowDataPkg){
ctx.writeRow(rowDataPkg);
}
public int getFieldIndex(List<byte[]> fields,String fkey){
int i=0;
for (byte[] field :fields) {
FieldPacket fieldPacket = new FieldPacket();
fieldPacket.read(field);
if (ByteUtil.getString(fieldPacket.name).equals(fkey)){
joinKeyType = fieldPacket.type;
return i;
}
i++;
}
return i;
}
}
class ShareDBJoinHandler implements SQLJobHandler {
private List<byte[]> fields;
private final ShareJoin ctx;
private String joinkey;
public ShareDBJoinHandler(ShareJoin ctx,String joinField) {
super();
this.ctx = ctx;
this.joinkey=joinField;
//EngineCtx.LOGGER.info("二次查询:" +" sql:" + querySQL+"/"+joinkey);
}
//private Map<String, byte[]> rows = new ConcurrentHashMap<String, byte[]>();
//private ConcurrentLinkedQueue<String> ids = new ConcurrentLinkedQueue<String>();
@Override
public void onHeader(String dataNode, byte[] header, List<byte[]> fields) {
this.fields = fields;
ctx.putDBFields(fields);
}
/*
public static String getFieldNames(List<byte[]> fields){
String str="";
for (byte[] field :fields) {
FieldPacket fieldPacket = new FieldPacket();
fieldPacket.read(field);
str+=ByteUtil.getString(fieldPacket.name)+",";
}
return str;
}
public static String getFieldName(byte[] field){
FieldPacket fieldPacket = new FieldPacket();
fieldPacket.read(field);
return ByteUtil.getString(fieldPacket.name);
}
*/
@Override
public boolean onRowData(String dataNode, byte[] rowData) {
int fid=this.ctx.getFieldIndex(fields,joinkey);
String id = ResultSetUtil.getColumnValAsString(rowData, fields, 0);//主键,默认id
String nid = ResultSetUtil.getColumnValAsString(rowData, fields, fid);
// 放入结果集
//rows.put(id, rowData);
ctx.putDBRow(id,nid, rowData,fid);
return false;
}
@Override
public void finished(String dataNode, boolean failed) {
ctx.endJobInput(dataNode,failed);
}
}
class ShareRowOutPutDataHandler implements SQLJobHandler {
private final List<byte[]> afields;
private List<byte[]> bfields;
private final ShareJoin ctx;
private final Map<String, byte[]> arows;
private int joinL;//A表(左边)关联字段的位置
private int joinR;//B表(右边)关联字段的位置
private String joinRkey;//B表(右边)关联字段
public ShareRowOutPutDataHandler(ShareJoin ctx,List<byte[]> afields,int joini,String joinField,Map<String, byte[]> arows) {
super();
this.afields = afields;
this.ctx = ctx;
this.arows = arows;
this.joinL =joini;
this.joinRkey= joinField;
//EngineCtx.LOGGER.info("二次查询:" +arows.size()+ " afields:"+FenDBJoinHandler.getFieldNames(afields));
}
@Override
public void onHeader(String dataNode, byte[] header, List<byte[]> bfields) {
this.bfields=bfields;
joinR=this.ctx.getFieldIndex(bfields,joinRkey);
ctx.writeHeader(dataNode,afields, bfields);
}
//不是主键,获取join左边的的记录
private byte[] getRow(String value,int index){
for(Map.Entry<String,byte[]> e: arows.entrySet() ){
String key=e.getKey();
RowDataPacket rowDataPkg = ResultSetUtil.parseRowData(e.getValue(), afields);
String id = ByteUtil.getString(rowDataPkg.fieldValues.get(index));
if (id.equals(value)){
return arows.remove(key);
}
}
return null;
}
@Override
public boolean onRowData(String dataNode, byte[] rowData) {
RowDataPacket rowDataPkgold = ResultSetUtil.parseRowData(rowData, bfields);
// 获取Id字段,
String id = ByteUtil.getString(rowDataPkgold.fieldValues.get(joinR));
// 查找ID对应的A表的记录
byte[] arow = getRow(id,joinL);//arows.remove(id);
while (arow!=null) {
RowDataPacket rowDataPkg = ResultSetUtil.parseRowData(arow,afields );//ctx.getAllFields());
for (int i=1;i<rowDataPkgold.fieldCount;i++){
// 设置b.name 字段
byte[] bname = rowDataPkgold.fieldValues.get(i);
rowDataPkg.add(bname);
rowDataPkg.addFieldCount(1);
}
//RowData(rowDataPkg);
ctx.writeRow(rowDataPkg);
arow = getRow(id,joinL);
}
return false;
}
@Override
public void finished(String dataNode, boolean failed) {
// EngineCtx.LOGGER.info("完成2:" + dataNode+" failed:"+failed);
}
}
This diff is collapsed.
......@@ -23,44 +23,44 @@
*/
package io.mycat;
import io.mycat.server.config.node.SystemConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.mycat.config.loader.zkprocess.comm.ZkConfig;
import io.mycat.config.model.SystemConfig;
/**
* @author mycat
*/
public final class MycatStartup {
private static final String dateFormat = "yyyy-MM-dd HH:mm:ss";
private static final class Holder {
private static final Logger LOGGER = LoggerFactory
.getLogger(MycatStartup.class);
}
private static final String dateFormat = "yyyy-MM-dd HH:mm:ss";
private static final Logger LOGGER = LoggerFactory.getLogger(MycatStartup.class);
public static void main(String[] args) {
//use zk ?
ZkConfig.getInstance().initZk();
try {
String home = SystemConfig.getHomePath();
if (home == null) {
System.out.println(SystemConfig.SYS_HOME + " is not set.");
System.exit(-1);
}
// init
MycatServer server = MycatServer.getInstance();
server.beforeStart();
public static void main(String[] args) {
try {
String home = SystemConfig.getHomePath();
if (home == null) {
System.out.println(SystemConfig.SYS_HOME + " is not set.");
System.exit(-1);
}
// init
MycatServer server = MycatServer.getInstance();
// startup
server.startup();
System.out.println("MyCAT Server startup successfully. see logs in logs/mycat.log");
// startup
server.startup();
System.out.println("MyCAT Server startup successfully. see logs in logs/mycat.log");
while (true) {
Thread.sleep(300 * 1000);
}
} catch (Exception e) {
SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
Holder.LOGGER.error(sdf.format(new Date()) + " startup error", e);
System.exit(-1);
}
}
}
\ No newline at end of file
} catch (Exception e) {
SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
LOGGER.error(sdf.format(new Date()) + " startup error", e);
System.exit(-1);
}
}
}
package io.mycat.backend;
import io.mycat.net.ClosableConnection;
import io.mycat.route.RouteResultsetNode;
import io.mycat.server.MySQLFrontConnection;
import io.mycat.server.executors.ResponseHandler;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
public interface BackendConnection extends ClosableConnection{
public boolean isModifiedSQLExecuted();
public boolean isFromSlaveDB();
public String getSchema();
public void setSchema(String newSchema);
public long getLastTime();
public boolean isClosedOrQuit();
public void setAttachment(Object attachment);
public void quit();
public void setLastTime(long currentTimeMillis);
public void release();
public void setResponseHandler(ResponseHandler commandHandler);
public void commit();
public void query(String sql) throws UnsupportedEncodingException;
public Object getAttachment();
// public long getThreadId();
public void execute(RouteResultsetNode node, MySQLFrontConnection source,
boolean autocommit) throws IOException;
public boolean syncAndExcute();
public void rollback();
public boolean isBorrowed();
public void setBorrowed(boolean borrowed);
public int getTxIsolation();
public boolean isAutocommit();
public long getId();
public void close(String reason);
public String getCharset();
public PhysicalDatasource getPool();
}
package io.mycat.backend;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import io.mycat.backend.mysql.nio.handler.ResponseHandler;
import io.mycat.net.ClosableConnection;
import io.mycat.route.RouteResultsetNode;
import io.mycat.server.ServerConnection;
public interface BackendConnection extends ClosableConnection {
public boolean isModifiedSQLExecuted();
public boolean isFromSlaveDB();
public String getSchema();
public void setSchema(String newSchema);
public long getLastTime();
public boolean isClosedOrQuit();
public void setAttachment(Object attachment);
public void quit();
public void setLastTime(long currentTimeMillis);
public void release();
public boolean setResponseHandler(ResponseHandler commandHandler);
public void commit();
public void query(String sql) throws UnsupportedEncodingException;
public Object getAttachment();
// public long getThreadId();
public void execute(RouteResultsetNode node, ServerConnection source,
boolean autocommit) throws IOException;
public void recordSql(String host, String schema, String statement);
public boolean syncAndExcute();
public void rollback();
public boolean isBorrowed();
public void setBorrowed(boolean borrowed);
public int getTxIsolation();
public boolean isAutocommit();
public long getId();
public void discardClose(String reason);
}
package io.mycat.backend;
import io.mycat.net.Connection;
import io.mycat.net.NetSystem;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import io.mycat.MycatServer;
import io.mycat.backend.datasource.PhysicalDatasource;
import io.mycat.backend.jdbc.JDBCConnection;
import io.mycat.backend.mysql.nio.MySQLConnection;
import io.mycat.net.NIOProcessor;
public class ConMap {
// key -schema
private final ConcurrentHashMap<String, ConQueue> items = new ConcurrentHashMap<String, ConQueue>();
......@@ -59,56 +64,73 @@ public class ConMap {
public int getActiveCountForSchema(String schema,
PhysicalDatasource dataSouce) {
int total = 0;
for (Connection conn : NetSystem.getInstance().getAllConnectios()
.values()) {
if (conn instanceof BackendConnection) {
BackendConnection theCon = (BackendConnection) conn;
if (theCon.getSchema().equals(schema)
&& theCon.getPool() == dataSouce) {
if (theCon.isBorrowed()) {
total++;
for (NIOProcessor processor : MycatServer.getInstance().getProcessors()) {
for (BackendConnection con : processor.getBackends().values()) {
if (con instanceof MySQLConnection) {
MySQLConnection mysqlCon = (MySQLConnection) con;
if (mysqlCon.getSchema().equals(schema)
&& mysqlCon.getPool() == dataSouce
&& mysqlCon.isBorrowed()) {
total++;
}
}
}
}
return total;
}
}else if (con instanceof JDBCConnection) {
JDBCConnection jdbcCon = (JDBCConnection) con;
if (jdbcCon.getSchema().equals(schema) && jdbcCon.getPool() == dataSouce
&& jdbcCon.isBorrowed()) {
total++;
}
}
}
}
return total;
}
public int getActiveCountForDs(PhysicalDatasource dataSouce) {
int total = 0;
for (Connection conn : NetSystem.getInstance().getAllConnectios()
.values()) {
if (conn instanceof BackendConnection) {
BackendConnection theCon = (BackendConnection) conn;
if (theCon.getPool() == dataSouce) {
if (theCon.isBorrowed()) {
total++;
for (NIOProcessor processor : MycatServer.getInstance().getProcessors()) {
for (BackendConnection con : processor.getBackends().values()) {
if (con instanceof MySQLConnection) {
MySQLConnection mysqlCon = (MySQLConnection) con;
if (mysqlCon.getPool() == dataSouce
&& mysqlCon.isBorrowed() && !mysqlCon.isClosed()) {
total++;
}
}
}
}
return total;
}
public void clearConnections(String reason, PhysicalDatasource dataSouce) {
Iterator<Entry<Long, Connection>> itor = NetSystem.getInstance()
.getAllConnectios().entrySet().iterator();
while (itor.hasNext()) {
Entry<Long, Connection> entry = itor.next();
Connection con = entry.getValue();
if (con instanceof BackendConnection) {
if (((BackendConnection) con).getPool() == dataSouce) {
con.close(reason);
itor.remove();
}
}
} else if (con instanceof JDBCConnection) {
JDBCConnection jdbcCon = (JDBCConnection) con;
if (jdbcCon.getPool() == dataSouce
&& jdbcCon.isBorrowed() && !jdbcCon.isClosed()) {
total++;
}
}
}
}
return total;
}
public void clearConnections(String reason, PhysicalDatasource dataSouce) {
for (NIOProcessor processor : MycatServer.getInstance().getProcessors()) {
ConcurrentMap<Long, BackendConnection> map = processor.getBackends();
Iterator<Entry<Long, BackendConnection>> itor = map.entrySet().iterator();
while (itor.hasNext()) {
Entry<Long, BackendConnection> entry = itor.next();
BackendConnection con = entry.getValue();
if (con instanceof MySQLConnection) {
if (((MySQLConnection) con).getPool() == dataSouce) {
con.close(reason);
itor.remove();
}
}else if((con instanceof JDBCConnection)
&& (((JDBCConnection) con).getPool() == dataSouce)){
con.close(reason);
itor.remove();
}
}
}
items.clear();
items.clear();
}
}
}
\ No newline at end of file
......@@ -37,10 +37,12 @@ public class ConQueue {
this.executeCount++;
}
public void removeCon(BackendConnection con) {
if (!autoCommitCons.remove(con)) {
manCommitCons.remove(con);
public boolean removeCon(BackendConnection con) {
boolean removed = autoCommitCons.remove(con);
if (!removed) {
return manCommitCons.remove(con);
}
return removed;
}
public boolean isSameCon(BackendConnection con) {
......@@ -65,13 +67,13 @@ public class ConQueue {
count);
while (!manCommitCons.isEmpty() && readyCloseCons.size() < count) {
BackendConnection theCon = manCommitCons.poll();
if (theCon != null) {
if (theCon != null&&!theCon.isBorrowed()) {
readyCloseCons.add(theCon);
}
}
while (!autoCommitCons.isEmpty() && readyCloseCons.size() < count) {
BackendConnection theCon = autoCommitCons.poll();
if (theCon != null) {
if (theCon != null&&!theCon.isBorrowed()) {
readyCloseCons.add(theCon);
}
......
/*
* Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software;Designed and Developed mainly by many Chinese
* opensource volunteers. you can redistribute it and/or modify it under the
* terms of the GNU General Public License version 2 only, as published by the
* Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Any questions about this component can be directed to it's project Web address
* https://code.google.com/p/opencloudb/.
*
*/
package io.mycat.backend;
import io.mycat.util.TimeUtil;
import java.util.LinkedList;
import java.util.List;
/**
* 记录最近3个时段的平均响应时间,默认1,10,30分钟。
*
* @author mycat
*/
public class HeartbeatRecorder {
private static final int MAX_RECORD_SIZE = 256;
private static final long AVG1_TIME = 60 * 1000L;
private static final long AVG2_TIME = 10 * 60 * 1000L;
private static final long AVG3_TIME = 30 * 60 * 1000L;
private long avg1;
private long avg2;
private long avg3;
private final List<Record> records;
public HeartbeatRecorder() {
this.records = new LinkedList<Record>();
}
public String get() {
return new StringBuilder().append(avg1).append(',').append(avg2).append(',').append(avg3).toString();
}
public void set(long value) {
if (value < 0) {
return;
}
long time = TimeUtil.currentTimeMillis();
remove(time);
int size = records.size();
if (size == 0) {
records.add(new Record(value, time));
avg1 = avg2 = avg3 = value;
return;
}
if (size >= MAX_RECORD_SIZE) {
records.remove(0);
}
records.add(new Record(value, time));
calculate(time);
}
/**
* 删除超过统计时间段的数据
*/
private void remove(long time) {
final List<Record> records = this.records;
while (records.size() > 0) {
Record record = records.get(0);
if (time >= record.time + AVG3_TIME) {
records.remove(0);
} else {
break;
}
}
}
/**
* 计算记录的统计数据
*/
private void calculate(long time) {
long v1 = 0L, v2 = 0L, v3 = 0L;
int c1 = 0, c2 = 0, c3 = 0;
for (Record record : records) {
long t = time - record.time;
if (t <= AVG1_TIME) {
v1 += record.value;
++c1;
}
if (t <= AVG2_TIME) {
v2 += record.value;
++c2;
}
if (t <= AVG3_TIME) {
v3 += record.value;
++c3;
}
}
avg1 = (v1 / c1);
avg2 = (v2 / c2);
avg3 = (v3 / c3);
}
/**
* @author mycat
*/
private static class Record {
private long value;
private long time;
Record(long value, long time) {
this.value = value;
this.time = time;
}
}
}
\ No newline at end of file
/*
* Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software;Designed and Developed mainly by many Chinese
* opensource volunteers. you can redistribute it and/or modify it under the
* terms of the GNU General Public License version 2 only, as published by the
* Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Any questions about this component can be directed to it's project Web address
* https://code.google.com/p/opencloudb/.
*
*/
package io.mycat.backend;
import java.io.IOException;
import io.mycat.backend.heartbeat.DBHeartbeat;
import io.mycat.backend.heartbeat.MySQLHeartbeat;
import io.mycat.backend.nio.MySQLBackendConnectionFactory;
import io.mycat.server.config.node.DBHostConfig;
import io.mycat.server.config.node.DataHostConfig;
import io.mycat.server.executors.ResponseHandler;
/**
* @author mycat
*/
public class MySQLDataSource extends PhysicalDatasource {
private final MySQLBackendConnectionFactory factory;
public MySQLDataSource(DBHostConfig config, DataHostConfig hostConfig,
boolean isReadNode) {
super(config, hostConfig, isReadNode);
this.factory = new MySQLBackendConnectionFactory();
}
@Override
public void createNewConnection(ResponseHandler handler,String schema) throws IOException {
factory.make(this, handler,schema);
}
@Override
public DBHeartbeat createHeartBeat() {
return new MySQLHeartbeat(this);
}
}
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment