Skip to content

Commit

Permalink
Merge branch '1.21-release-jar-with-dependencies-2022-5-5-ban-abandon…
Browse files Browse the repository at this point in the history
…-connection'

# Conflicts:
#	common/src/main/java/io/mycat/newquery/NewMycatConnection.java
#	example/src/test/java/io/mycat/monitor/MycatMonitorTest.java
#	mycat2/src/main/java/io/mycat/vertxmycat/NativeMySQLConnection.java
  • Loading branch information
junwen12221 committed May 8, 2022
2 parents 7e92b92 + 30bcbff commit c27bd83
Show file tree
Hide file tree
Showing 14 changed files with 210 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

public interface NewMycatConnection {

String getTargetName();


default void query(String sql, MysqlCollector collector) {
prepareQuery(sql, Collections.emptyList(), collector);
}
Expand Down Expand Up @@ -42,6 +45,8 @@ default Future<RowSet> query(String sql) {

public Future<Void> close();

public boolean isClosed();

default void onSend() {

}
Expand Down
10 changes: 10 additions & 0 deletions common/src/main/java/io/mycat/newquery/NewMycatConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,16 @@ public Future<Void> close() {
});
}

@Override
public boolean isClosed() {
try {
return this.connection.isClosed();
}catch (Throwable throwable){
LOGGER.error("",throwable);
return true;
}
}

@Override
public void abandonConnection() {
JdbcUtils.close(this.connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,16 @@ public class NewVertxConnectionImpl implements NewMycatConnection {

MySQLConnectionImpl mySQLConnection;
CursorHandler cursorHandler = null;
String targetName;

public NewVertxConnectionImpl(MySQLConnectionImpl mySQLConnection) {
public NewVertxConnectionImpl(String targetName,MySQLConnectionImpl mySQLConnection) {
this.mySQLConnection = mySQLConnection;
this.targetName = targetName;
}

@Override
public String getTargetName() {
return this.targetName;
}

@Override
Expand Down Expand Up @@ -572,6 +579,11 @@ public Future<Void> close() {
return Future.succeededFuture();
}

@Override
public boolean isClosed() {
return mySQLConnection == null;
}

@Override
public void abandonConnection() {
LOGGER.debug("abandonConnection");
Expand Down
1 change: 1 addition & 0 deletions example/src/test/java/io/mycat/assemble/AssembleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.mycat.hint.CreateClusterHint;
import io.mycat.hint.CreateDataSourceHint;
import lombok.SneakyThrows;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down
5 changes: 5 additions & 0 deletions example/src/test/java/io/mycat/assemble/MycatTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,9 @@ public default MycatRowMetaData getColumns(Connection connection, String db, Str
return new CopyMycatRowMetaData(new JdbcRowMetaData(metaData));
}
}
default long getUseCon(Connection connection,String dsName) throws Exception {
List<Map<String, Object>> maps = executeQuery(connection, "/*+ mycat:showDataSources{} */");
return maps.stream().filter(r -> dsName.equalsIgnoreCase((String) r.get("NAME"))).map(r -> (Number) r.get("USE_CON")).findFirst().get().longValue();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.stream.IntStream;

@Disabled
@Ignore
public abstract class DefaultBackendConnectionTest implements MycatTest {
@Test
public void testPrototypeNoTranscationSelect() throws Exception {
Expand All @@ -32,8 +31,19 @@ public void testPrototypeNoTranscationSelect() throws Exception {
public void testPrototypeTranscationSelectCommit() throws Exception {
try (Connection mycatConnection = getMySQLConnection(DB_MYCAT)) {
execute(mycatConnection, RESET_CONFIG);
execute(mycatConnection, "DROP DATABASE db1");
execute(mycatConnection, "CREATE DATABASE db1");
execute(mycatConnection, "CREATE TABLE db1.`travelrecord2` (\n" +
" `id` bigint(20) NOT NULL KEY,\n" +
" `user_id` varchar(100) CHARACTER SET utf8 DEFAULT NULL,\n" +
" `traveldate` datetime(6) DEFAULT NULL,\n" +
" `fee` decimal(10,0) DEFAULT NULL,\n" +
" `days` int(11) DEFAULT NULL,\n" +
" `blob` longblob DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4\n");

mycatConnection.setAutoCommit(false);
repeatSql(mycatConnection, "SELECT * FROM `mysql`.`role_edges` LIMIT 0, 1000; ",400);
repeatSql(mycatConnection, "SELECT * FROM `db1`.`travelrecord2` LIMIT 0, 1000; ",400);
Assert.assertEquals(1,getUseCon(mycatConnection,"prototypeDs"));
mycatConnection.commit();

Expand All @@ -44,8 +54,20 @@ public void testPrototypeTranscationSelectCommit() throws Exception {
public void testPrototypeTranscationSelectRollback() throws Exception {
try (Connection mycatConnection = getMySQLConnection(DB_MYCAT)) {
execute(mycatConnection, RESET_CONFIG);

execute(mycatConnection, "DROP DATABASE db1");
execute(mycatConnection, "CREATE DATABASE db1");
execute(mycatConnection, "CREATE TABLE db1.`travelrecord2` (\n" +
" `id` bigint(20) NOT NULL KEY,\n" +
" `user_id` varchar(100) CHARACTER SET utf8 DEFAULT NULL,\n" +
" `traveldate` datetime(6) DEFAULT NULL,\n" +
" `fee` decimal(10,0) DEFAULT NULL,\n" +
" `days` int(11) DEFAULT NULL,\n" +
" `blob` longblob DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4\n");

mycatConnection.setAutoCommit(false);
repeatSql(mycatConnection, "SELECT * FROM `mysql`.`role_edges` LIMIT 0, 1000; ",400);
repeatSql(mycatConnection, "SELECT * FROM `db1`.`travelrecord2` LIMIT 0, 1000; ",400);
Assert.assertEquals(1,getUseCon(mycatConnection,"prototypeDs"));
mycatConnection.rollback();

Expand All @@ -56,8 +78,20 @@ public void testPrototypeTranscationSelectRollback() throws Exception {
public void testPrototypeTranscationSelectSetAutocommit() throws Exception {
try (Connection mycatConnection = getMySQLConnection(DB_MYCAT)) {
execute(mycatConnection, RESET_CONFIG);

execute(mycatConnection, "DROP DATABASE db1");
execute(mycatConnection, "CREATE DATABASE db1");
execute(mycatConnection, "CREATE TABLE db1.`travelrecord2` (\n" +
" `id` bigint(20) NOT NULL KEY,\n" +
" `user_id` varchar(100) CHARACTER SET utf8 DEFAULT NULL,\n" +
" `traveldate` datetime(6) DEFAULT NULL,\n" +
" `fee` decimal(10,0) DEFAULT NULL,\n" +
" `days` int(11) DEFAULT NULL,\n" +
" `blob` longblob DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4\n");

mycatConnection.setAutoCommit(false);
repeatSql(mycatConnection, "SELECT * FROM `mysql`.`role_edges` LIMIT 0, 1000; ",400);
repeatSql(mycatConnection, "SELECT * FROM `db1`.`travelrecord2` LIMIT 0, 1000; ",400);
Assert.assertEquals(1,getUseCon(mycatConnection,"prototypeDs"));
mycatConnection.setAutoCommit(true);

Expand Down Expand Up @@ -168,6 +202,18 @@ public void testNormalTranscationSelectCommit() throws Exception {
public void testNormalTranscationSelectRollback() throws Exception {
try (Connection mycatConnection = getMySQLConnection(DB_MYCAT)) {
execute(mycatConnection, RESET_CONFIG);

execute(mycatConnection, "DROP DATABASE db1");
execute(mycatConnection, "CREATE DATABASE db1");
execute(mycatConnection, "CREATE TABLE db1.`travelrecord2` (\n" +
" `id` bigint(20) NOT NULL KEY,\n" +
" `user_id` varchar(100) CHARACTER SET utf8 DEFAULT NULL,\n" +
" `traveldate` datetime(6) DEFAULT NULL,\n" +
" `fee` decimal(10,0) DEFAULT NULL,\n" +
" `days` int(11) DEFAULT NULL,\n" +
" `blob` longblob DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4\n");

execute(mycatConnection, "DROP DATABASE db1");
execute(mycatConnection, "CREATE DATABASE db1");
execute(mycatConnection, "CREATE TABLE db1.`travelrecord2` (\n" +
Expand All @@ -179,7 +225,7 @@ public void testNormalTranscationSelectRollback() throws Exception {
" `blob` longblob DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4\n");
mycatConnection.setAutoCommit(false);
repeatSql(mycatConnection, "SELECT * FROM `mysql`.`role_edges` LIMIT 0, 1000; ",400);
repeatSql(mycatConnection, "SELECT * FROM `db1`.`travelrecord2` LIMIT 0, 1000; ",400);
Assert.assertEquals(1,getUseCon(mycatConnection,"prototypeDs"));
mycatConnection.rollback();

Expand Down Expand Up @@ -398,6 +444,15 @@ private void initTestData(Connection mycatConnection) throws Exception {

execute(mycatConnection, "USE `db1`;");

execute(mycatConnection, "CREATE TABLE `tmp` (\n" +
" `id` bigint(20) NOT NULL KEY,\n" +
" `user_id` varchar(100) CHARACTER SET utf8 DEFAULT NULL,\n" +
" `traveldate` datetime(6) DEFAULT NULL,\n" +
" `fee` decimal(10,0) DEFAULT NULL,\n" +
" `days` int(11) DEFAULT NULL,\n" +
" `blob` longblob DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4\n");

execute(mycatConnection, "CREATE TABLE `travelrecord2` (\n" +
" `id` bigint(20) NOT NULL KEY,\n" +
" `user_id` varchar(100) CHARACTER SET utf8 DEFAULT NULL,\n" +
Expand All @@ -421,12 +476,6 @@ public void testShardingTranscationSelect() throws Exception {
Assert.assertEquals(0,getUseCon(mycatConnection,"ds1"));
}
}
private long getUseCon(Connection connection,String dsName) throws Exception {
List<Map<String, Object>> maps = executeQuery(connection, "/*+ mycat:showDataSources{} */");
return maps.stream().filter(r -> dsName.equalsIgnoreCase((String) r.get("NAME"))).map(r -> (Number) r.get("USE_CON")).findFirst().get().longValue();
}



private void repeatSql(Connection mycatConnection, String sql, int count) throws Exception {
for (int i = 0; i < count; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
@NotThreadSafe
@net.jcip.annotations.NotThreadSafe
@Disabled
@Ignore
public class ProxyBackendConnectionTest extends DefaultBackendConnectionTest{
private boolean init = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
@NotThreadSafe
@net.jcip.annotations.NotThreadSafe
@Disabled
@Ignore
public class XaBackendConnectionTest extends DefaultBackendConnectionTest {
boolean init = false;

Expand Down
13 changes: 9 additions & 4 deletions example/src/test/java/io/mycat/monitor/MycatMonitorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@
import io.vertx.core.http.HttpMethod;
import io.vertx.core.json.Json;
import lombok.SneakyThrows;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.*;
import org.testng.Assert;

import javax.annotation.concurrent.NotThreadSafe;
Expand All @@ -39,6 +36,7 @@ public static void beforeClass() {
}

@AfterClass
@SneakyThrows
public static void afterClass() {
if (vertx != null) {
vertx.close();
Expand All @@ -52,6 +50,13 @@ public synchronized void before() {

}

@After
@SneakyThrows
public synchronized void after() {
try (Connection mycatConnection = getMySQLConnection(DB_MYCAT);) {
JdbcUtils.execute(mycatConnection, "/*+mycat:setSqlTimeFilter{value:30} */", Collections.emptyList());
}
}
@Test
@SneakyThrows
public void test() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public Future<NewMycatConnection> getConnection() {
DatabaseInstanceEntry stat = DatabaseInstanceEntry.stat(targetName);
stat.plusCon();
stat.plusQps();
return new NewVertxConnectionImpl((MySQLConnectionImpl) sqlConnection) {
return new NewVertxConnectionImpl(targetName,(MySQLConnectionImpl) sqlConnection) {
long start;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ public ThreadMycatConnectionImplWrapper(DatabaseInstanceEntry stat, NewMycatConn
this.newMycatConnection = newMycatConnection;
}

@Override
public String getTargetName() {
return this.newMycatConnection.getTargetName();
}

@Override
public Future<RowSet> query(String sql, List<Object> params) {
IOExecutor ioExecutor = MetaClusterCurrent.wrapper(IOExecutor.class);
Expand Down Expand Up @@ -150,6 +155,11 @@ public Future<Void> close() {
});
}

@Override
public boolean isClosed() {
return newMycatConnection.isClosed();
}

@Override
public void abandonConnection() {
IOExecutor ioExecutor = MetaClusterCurrent.wrapper(IOExecutor.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,9 @@ public Future<Void> clearConnections() {
return future.onComplete(u -> executeTranscationConnection(c -> {
return c.close();
}).onComplete(c -> {
for (NewMycatConnection connection : map.values()) {
connection.close();
}
map.clear();
connectionState.clear();
}));
Expand Down
12 changes: 4 additions & 8 deletions va/src/main/java/cn/mycat/vertx/xa/impl/LocalSqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class LocalSqlConnection extends AbstractXaSqlConnection {
protected final Supplier<MySQLManager> mySQLManagerSupplier;

public LocalSqlConnection(MySQLIsolation isolation, Supplier<MySQLManager> mySQLManagerSupplier, XaLog xaLog) {
super(isolation,xaLog);
super(isolation, xaLog);
this.mySQLManagerSupplier = mySQLManagerSupplier;
}

Expand Down Expand Up @@ -80,11 +80,7 @@ public Future<NewMycatConnection> getConnection(String targetName) {
}
return mySQLManager().getConnection(targetName)
.map(connection -> {
if (!map.containsKey(targetName)) {
map.put(targetName, connection);
} else {
extraConnections.add(connection);
}
extraConnections.add(connection);
return connection;
});
}
Expand All @@ -101,7 +97,7 @@ public Future<Void> rollback() {
inTranscation = false;
//每一个记录日志
return Future.succeededFuture();
}).onFailure(event -> LOGGER.error("",event)).mapEmpty().flatMap(o -> closeStatementState());
}).onFailure(event -> LOGGER.error("", event)).mapEmpty().flatMap(o -> closeStatementState());
}

@Override
Expand All @@ -110,7 +106,7 @@ public Future<Void> commit() {
return CompositeFuture.join(rollback).onComplete(event -> {
inTranscation = false;
//每一个记录日志
}).onFailure(event -> LOGGER.error("",event)).mapEmpty().flatMap(o -> closeStatementState());
}).onFailure(event -> LOGGER.error("", event)).mapEmpty().flatMap(o -> closeStatementState());
}

@Override
Expand Down
Loading

0 comments on commit c27bd83

Please sign in to comment.