Skip to content

Commit

Permalink
improve removeAbandonedTimeoutSecond
Browse files Browse the repository at this point in the history
  • Loading branch information
junwen12221 committed Jun 7, 2022
1 parent 4d2e2f0 commit cf8464e
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,5 @@ default void onRev() {

public long getActiveTimeStamp();

public int getRemoveAbandonedTimeoutSecond();
}
21 changes: 14 additions & 7 deletions common/src/main/java/io/mycat/newquery/NewMycatConnectionImpl.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package io.mycat.newquery;

import com.alibaba.druid.pool.DruidConnectionHolder;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLReplaceable;
import com.alibaba.druid.sql.ast.SQLStatement;
Expand All @@ -13,6 +11,7 @@
import io.mycat.beans.mycat.*;
import io.mycat.beans.mysql.packet.ColumnDefPacket;
import io.mycat.beans.mysql.packet.ColumnDefPacketImpl;
import io.mycat.config.DatasourceConfig;
import io.reactivex.rxjava3.core.Observable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
Expand Down Expand Up @@ -54,16 +53,19 @@ public class NewMycatConnectionImpl implements NewMycatConnection {

private long activeTimestamp = System.currentTimeMillis();

private DatasourceConfig config;

public NewMycatConnectionImpl(boolean needLastInsertId, Connection connection) {
this.needLastInsertId = needLastInsertId;
this.connection = connection;
}

public NewMycatConnectionImpl(String targetName, Connection connection, String dbType) {
this.targetName = targetName;
public NewMycatConnectionImpl(DatasourceConfig datasourceConfig, Connection connection) {
this.targetName = datasourceConfig.getName();
this.connection = connection;
this.needLastInsertId = true;
this.dbType = dbType;
this.dbType = datasourceConfig.getDbType();
this.config = datasourceConfig;
}

@Override
Expand Down Expand Up @@ -504,14 +506,14 @@ public synchronized Future<SqlResult> insert(String sql, List<Object> params) {
lastInsertId = getLastInsertId(statement);
}
} else {
if(isClickHouse()){
if (isClickHouse()) {
String paramize = paramize(sql, params);
try (Statement statement = connection.createStatement();) {
onSend();
LOGGER.debug("sql:{}", paramize);
affectRows = statement.executeUpdate(paramize);
onRev();
lastInsertId= 0;
lastInsertId = 0;
}
} else {
try (PreparedStatement preparedStatement = connection.prepareStatement(sql, needLastInsertId ? Statement.RETURN_GENERATED_KEYS : Statement.NO_GENERATED_KEYS)) {
Expand Down Expand Up @@ -649,6 +651,11 @@ public long getActiveTimeStamp() {
return this.activeTimestamp;
}

@Override
public int getRemoveAbandonedTimeoutSecond() {
return this.config.getRemoveAbandonedTimeoutSecond();
}

private long getLastInsertId(Statement statement) {
long lastInsertId = 0;
if (needLastInsertId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ public Future<NewMycatConnection> getConnection() {
DatabaseInstanceEntry stat = DatabaseInstanceEntry.stat(targetName);
stat.plusCon();
stat.plusQps();
NewMycatConnectionImpl newMycatConnection = new NewMycatConnectionImpl(targetName, defaultConnection.getRawConnection(),
defaultConnection.getDataSource().getDbType()
) {
NewMycatConnectionImpl newMycatConnection = new NewMycatConnectionImpl(defaultConnection.getDataSource().getConfig(), defaultConnection.getRawConnection()) {
long start;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public static MycatDatasourcePool createNativeDatasourcePool(DatasourceConfig da
config.setRetry(datasource.getMaxRetryCount());
config.setTimer(datasource.getIdleTimeout());
config.setClientDeprecateEof(NewMycatConnectionConfig.CLIENT_DEPRECATE_EOF);
config.setRemoveAbandonedTimeoutSecond(datasource.getRemoveAbandonedTimeoutSecond());
Vertx vertx = MetaClusterCurrent.wrapper(Vertx.class);
VertxPoolConnectionImpl vertxConnectionPool = new VertxPoolConnectionImpl(config, vertx);
return new MycatNativeDatasourcePool(vertxConnectionPool, targetName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,18 @@ public ThreadMycatConnectionImplWrapper(DatabaseInstanceEntry stat, NewMycatConn
this.stat = stat;
this.newMycatConnection = newMycatConnection;

Vertx vertx = MetaClusterCurrent.wrapper(Vertx.class);
ServerConfig serverConfig = MetaClusterCurrent.wrapper(ServerConfig.class);
TimerConfig idleTimer = serverConfig.getIdleTimer();
if (idleTimer != null && idleTimer.getPeriod() > 0) {
long period = TimeUnit.valueOf(idleTimer.getTimeUnit()).toMillis(idleTimer.getPeriod());
int removeAbandonedTimeoutSecond = getRemoveAbandonedTimeoutSecond();
if (removeAbandonedTimeoutSecond > 0) {
Vertx vertx = MetaClusterCurrent.wrapper(Vertx.class);
long period = TimeUnit.SECONDS.toMillis(removeAbandonedTimeoutSecond);
timeId = vertx.setPeriodic(period, id -> {
if (newMycatConnection.isClosed()) {
vertx.cancelTimer(id);
}else {
} else {
long duration = System.currentTimeMillis() - newMycatConnection.getActiveTimeStamp();
if (duration > period) {
// vertx.cancelTimer(id);
ThreadMycatConnectionImplWrapper.this.abandonConnection();
vertx.cancelTimer(id);
}
}
});
Expand Down Expand Up @@ -239,4 +238,9 @@ public void onActiveTimestamp(long timestamp) {
public long getActiveTimeStamp() {
return this.newMycatConnection.getActiveTimeStamp();
}

@Override
public int getRemoveAbandonedTimeoutSecond() {
return this.newMycatConnection.getRemoveAbandonedTimeoutSecond();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -301,4 +301,9 @@ public void onActiveTimestamp(long timestamp) {
public long getActiveTimeStamp() {
return this.activeTimeStamp;
}

@Override
public int getRemoveAbandonedTimeoutSecond() {
return this.connection.getConfig().getRemoveAbandonedTimeoutSecond();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public static class Config {
int minCon = 1;
long timer = TimeUnit.SECONDS.toMillis(30);
int retry = 3;

int removeAbandonedTimeoutSecond=180;//秒
}

@SneakyThrows
Expand Down

0 comments on commit cf8464e

Please sign in to comment.