Skip to content

Commit

Permalink
Merge branch 'v1.21-2022-2-9'
Browse files Browse the repository at this point in the history
# Conflicts:
#	config/src/main/java/io/mycat/config/ServerConfig.java
#	mycat2/src/main/java/io/mycat/vertx/VertxMySQLAuthHandler.java
#	mycat2/src/main/java/io/mycat/vertx/VertxMycatServer.java
  • Loading branch information
junwen12221 committed Mar 1, 2022
2 parents 3014707 + d1f285e commit ff53eb6
Show file tree
Hide file tree
Showing 22 changed files with 190 additions and 62 deletions.
6 changes: 3 additions & 3 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,19 @@
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>4.2.3</version>
<version>4.2.5</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-sql-client</artifactId>
<version>4.2.3</version>
<version>4.2.5</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mysql-client</artifactId>
<version>4.2.3</version>
<version>4.2.5</version>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down
4 changes: 4 additions & 0 deletions common/src/main/java/io/mycat/MycatDataContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,8 @@ default public int getMergeUnionSize() {
public Integer releaseLock(String name);

public Integer isFreeLock(String name);

void setHolder(Object holder);

Object getHolder();
}
29 changes: 29 additions & 0 deletions common/src/main/java/io/mycat/ZipUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.mycat;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;

public class ZipUtil {

public static void main(String[] args) throws Exception{
JarFile jarFile = new JarFile(
""
);
Map<String, JarEntry> map= new HashMap<>();
jarFile.stream().forEach(new Consumer<JarEntry>() {
@Override
public void accept(JarEntry jarEntry) {
if(!map.containsKey(jarEntry.getName())){
map.put(jarEntry.getName(),jarEntry);
}else {
System.out.println();
}

}
});
System.out.println();
}
}
2 changes: 2 additions & 0 deletions config/src/main/java/io/mycat/config/ServerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class ServerConfig {
private String serverVersion = "5.7.33-mycat-2.0";
private boolean ignoreCast = false;
private int fullTableScanLimit = 1024;
private boolean fullTableScanException = false;
//BROADCAST
private boolean forcedPushDownBroadcast = false;
private boolean bkaJoin = true;
Expand All @@ -74,6 +75,7 @@ public class ServerConfig {
private String pushDownSelectDual = "hackRouter";


private boolean useProxyProtocol = false;
public static void main(String[] args) {
System.out.println(JsonUtil.toJson(new ServerConfig()));
}
Expand Down
4 changes: 2 additions & 2 deletions example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mysql-client</artifactId>
<version>4.2.3</version>
<version>4.2.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
<dependency>
Expand Down Expand Up @@ -125,7 +125,7 @@
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-junit5</artifactId>
<version>4.2.3</version>
<version>4.2.5</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
36 changes: 28 additions & 8 deletions example/src/test/java/io/mycat/client/ClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,23 @@
import io.mycat.DrdsSqlWithParams;
import io.mycat.assemble.MycatTest;
import io.mycat.calcite.DrdsRunnerHelper;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.mysqlclient.MySQLConnectOptions;
import io.vertx.mysqlclient.MySQLPool;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class ClientTest implements MycatTest {
@Test
Expand Down Expand Up @@ -54,7 +53,7 @@ public void testDBeaver() throws Exception {
*
*/
DrdsSqlWithParams drdsSqlWithParams = DrdsRunnerHelper.preParse("SELECT @@global.character_set_server, @@global.collation_server", null);
Assert.assertEquals("SELECT @@global.character_set_server, @@global.collation_server",drdsSqlWithParams.getParameterizedSQL());
Assert.assertEquals("SELECT @@global.character_set_server, @@global.collation_server", drdsSqlWithParams.getParameterizedSQL());
try (Connection mySQLConnection = getMySQLConnection(DB_MYCAT)) {
// Statement statement = mySQLConnection.createStatement();
// ResultSet resultSet = statement.executeQuery("SELECT * FROM `information_schema`.`CHARACTER_SETS` LIMIT 0, 1000; ");
Expand Down Expand Up @@ -128,4 +127,25 @@ public void tesHikariCP() throws Exception {
}

}

@Test
@Ignore
public void tesSLB() throws Exception {
Vertx vertx = Vertx.vertx();
NetClientOptions netClientOptions = new NetClientOptions();
List<Future> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
TimeUnit.SECONDS.sleep(1);
vertx.createNetClient(netClientOptions).connect(8066, "127.0.0.1", event -> {
if (event.succeeded()) {
NetSocket netSocket = event.result();
netSocket.close();
list.add(Future.succeededFuture());
}else {
list.add(Future.failedFuture(event.cause()));
}
});
}
CompositeFuture.join(list).toCompletionStage().toCompletableFuture().get(11,TimeUnit.SECONDS);
}
}
4 changes: 4 additions & 0 deletions hbt/src/main/java/io/mycat/AsyncMycatDataContextImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public abstract class AsyncMycatDataContextImpl extends NewMycatDataContextImpl
protected final static Logger LOGGER = LoggerFactory.getLogger(AsyncMycatDataContextImpl.class);
protected final static Logger FULL_TABLE_SCAN_LOGGER = LoggerFactory.getLogger("FULL_TABLE_SCAN_LOGGER");
public static int FULL_TABLE_SCAN_LIMIT = 1024;
public static boolean FULL_TABLE_SCAN_EXCEPTION ;
final Map<String, Future<NewMycatConnection>> transactionConnnectionMap = new HashMap<>();// int transaction
final List<Future<NewMycatConnection>> connnectionFutureCollection = new LinkedList<>();//not int transaction
final Map<String, List<Observable<Object[]>>> shareObservable = new HashMap<>();
Expand Down Expand Up @@ -192,6 +193,9 @@ public List<Observable<Object[]>> getObservableList(String node) {
if ((sqlMap.size() > FULL_TABLE_SCAN_LIMIT) && FULL_TABLE_SCAN_LOGGER.isInfoEnabled()) {
FULL_TABLE_SCAN_LOGGER.info(" warning sql:{},partition count:{},limit:{},it may be a full table scan.",
drdsSqlWithParams.toString(),sqlMap.size(),FULL_TABLE_SCAN_LIMIT);
if(FULL_TABLE_SCAN_EXCEPTION){
throw new MycatException("FULL_TABLE_SCAN_EXCEPTION:{}",drdsSqlWithParams.toString());
}
}
boolean share = mycatRelDatasourceSourceInfo.refCount > 0;
List<Observable<Object[]>> observables = getObservables((view
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public void split() {
case BIT_XOR:
case BIT_AND:
case SUM0:
case ANY_VALUE:
splitCommon(aggregateCall);
break;
default:
Expand Down
46 changes: 25 additions & 21 deletions hbt/src/main/java/io/mycat/calcite/rewriter/SQLRBORewriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -615,27 +615,31 @@ public static Optional<RelNode> aggregate(RelNode original, Aggregate aggregate)
}

private static Optional<RelNode> splitAggregate(MycatView viewNode, Aggregate aggregate) {

AggregatePushContext aggregateContext = AggregatePushContext.split(aggregate);

MycatView newView = viewNode.changeTo(
LogicalAggregate.create(viewNode.getRelNode(),
aggregate.getHints(),
aggregate.getGroupSet(),
aggregate.getGroupSets(),
aggregateContext.getPartialAggregateCallList()));

LogicalAggregate globalAggregateRelNode = LogicalAggregate.create(newView, aggregate.getHints(),
aggregate.getGroupSet(),
aggregate.getGroupSets(),
aggregateContext.getGlobalAggregateCallList());

MycatProject projectRelNode = MycatProject.create(globalAggregateRelNode,
aggregateContext.getProjectExprList(),
aggregate.getRowType());

return RexUtil.isIdentity(projectRelNode.getProjects(), projectRelNode.getInput().getRowType()) ?
Optional.of(globalAggregateRelNode) : Optional.of(projectRelNode);
try {
AggregatePushContext aggregateContext = AggregatePushContext.split(aggregate);

MycatView newView = viewNode.changeTo(
LogicalAggregate.create(viewNode.getRelNode(),
aggregate.getHints(),
aggregate.getGroupSet(),
aggregate.getGroupSets(),
aggregateContext.getPartialAggregateCallList()));

LogicalAggregate globalAggregateRelNode = LogicalAggregate.create(newView, aggregate.getHints(),
aggregate.getGroupSet(),
aggregate.getGroupSets(),
aggregateContext.getGlobalAggregateCallList());

MycatProject projectRelNode = MycatProject.create(globalAggregateRelNode,
aggregateContext.getProjectExprList(),
aggregate.getRowType());

return RexUtil.isIdentity(projectRelNode.getProjects(), projectRelNode.getInput().getRowType()) ?
Optional.of(globalAggregateRelNode) : Optional.of(projectRelNode);
}catch (Throwable throwable){
LOGGER.debug("",throwable);
}
return Optional.empty();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2140,10 +2140,11 @@ public Optional<String> getCreateTableSQLByJDBC(String schemaName, String tableN
return Optional.of(createTableSql);
}
}
return Optional.empty();
continue;
} catch (Exception e) {
LOGGER.error("", e);
}
continue;
} catch (Throwable e) {
LOGGER.error("can not get create table sql from:" + backend.getTargetName() + backend.getTargetSchemaTable(), e);
continue;
Expand Down
47 changes: 34 additions & 13 deletions mycat2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
<artifactId>mycat2</artifactId>

<dependencies>
<!-- https://mvnrepository.com/artifact/io.netty/netty-codec-haproxy -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-haproxy</artifactId>
<version>4.1.74.Final</version>
</dependency>

<!-- https://mvnrepository.com/artifact/io.r2dbc/r2dbc-pool -->
<!-- <dependency>-->
<!-- <groupId>io.r2dbc</groupId>-->
Expand Down Expand Up @@ -198,7 +205,7 @@
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>4.2.3</version>
<version>4.2.5</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
Expand Down Expand Up @@ -231,25 +238,39 @@

<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<finalName>mycat2-${project.version}</finalName>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>io.mycat.MycatCore</mainClass><!--这里改成自己的主类位置-->
</manifest>
</archive>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
<goal>shade</goal>
</goals>
<configuration>
<transformers>

<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>io.mycat.MycatCore</mainClass>
</transformer>

</transformers>
</configuration>
</execution>
</executions>
</plugin>
Expand Down
1 change: 1 addition & 0 deletions mycat2/src/main/java/io/mycat/MycatCore.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public MycatCore() {
ThreadPoolExecutorConfig workerPool = serverConfig.getServer().getWorkerPool();

AsyncMycatDataContextImpl.FULL_TABLE_SCAN_LIMIT = serverConfiguration.serverConfig().getServer().getFullTableScanLimit();
AsyncMycatDataContextImpl.FULL_TABLE_SCAN_EXCEPTION = serverConfiguration.serverConfig().getServer().isFullTableScanException();
HackRouter.PUSH_DOWN_SELECT_DUAL ="hackRouter".equalsIgnoreCase(serverConfiguration.serverConfig().getServer().getPushDownSelectDual());

NewMycatConnectionConfig.FORCE_NATIVE_DATASOURCE = "native".equalsIgnoreCase(System.getProperty("server"));
Expand Down
10 changes: 6 additions & 4 deletions mycat2/src/main/java/io/mycat/commands/MycatdbCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,13 @@ private Future<Void> handleBaseline(String text, MycatDataContext dataContext, R
}
if (text.startsWith("PARAMETERIZE")) {
text = text.substring("PARAMETERIZE".length()).trim();
DrdsSqlWithParams drdsSqlWithParams = DrdsRunnerHelper.preParse(text,dataContext.getDefaultSchema());
DrdsSqlWithParams drdsSqlWithParams = DrdsRunnerHelper.preParse(text, dataContext.getDefaultSchema());
String parameterizedSQL = drdsSqlWithParams.getParameterizedSQL();
String info = drdsSqlWithParams.toString();
ResultSetBuilder builder = ResultSetBuilder.create();
builder.addColumnInfo("PARAMETERIZED_SQL", JDBCType.VARCHAR)
.addColumnInfo("INFO", JDBCType.VARCHAR);
builder.addObjectRowPayload(Arrays.asList(parameterizedSQL,info));
builder.addObjectRowPayload(Arrays.asList(parameterizedSQL, info));
return response.sendResultSet(builder.build());
}
}
Expand All @@ -339,6 +339,7 @@ private static boolean isNavicatClientStatusQuery(String text) {

@NotNull
private static Map<String, Object> getHintRoute(SQLStatement sqlStatement) {
HashMap<String, Object> map = new HashMap<>();
List<SQLHint> hints = new LinkedList<>();
MySqlASTVisitorAdapter mySqlASTVisitorAdapter = new MySqlASTVisitorAdapter() {
@Override
Expand All @@ -358,7 +359,6 @@ public boolean visit(SQLSelect x) {
mycatHint = new MycatHint(text);
}
if (mycatHint != null) {
HashMap<String, Object> map = new HashMap<>();
map.put("REP_BALANCE_TYPE", ReplicaBalanceType.NONE);
for (MycatHint.Function function : mycatHint.getFunctions()) {
String name = function.getName();
Expand Down Expand Up @@ -423,7 +423,7 @@ public boolean visit(SQLSelect x) {
return map;
}
}
return Collections.emptyMap();
return map;
}

public static Future<Void> execute(MycatDataContext dataContext, Response receiver, SQLStatement sqlStatement) {
Expand All @@ -440,6 +440,7 @@ public static Future<Void> execute(MycatDataContext dataContext, Response receiv
TransactionSession transactionSession = dataContext.getTransactionSession();
Future future = transactionSession.openStatementState();
LogEntryHolder logRecord = logMonitor.startRecord(dataContext, null, sqlType, sql);
dataContext.setHolder(logRecord);
future = future.flatMap(unused -> {
try {
//////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -491,6 +492,7 @@ public static Future<Void> execute(MycatDataContext dataContext, Response receiv
});

future = future.onComplete((Handler<AsyncResult>) event -> {
dataContext.setHolder(null);
if (event.succeeded()) {
logRecord.recordSQLEnd(true, Collections.emptyMap(), "");
} else {
Expand Down
Loading

0 comments on commit ff53eb6

Please sign in to comment.