Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/v1.21-2022-2-9'
Browse files Browse the repository at this point in the history
  • Loading branch information
junwen12221 committed Feb 11, 2022
2 parents 1710d82 + 782d7ff commit 078c1f3
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public synchronized void prepareQuery(String sql, List<Object> params, MysqlColl
}

} catch (Exception e) {
LOGGER.error("",e);
collector.onError(e);
return Future.failedFuture(e);
} finally {
Expand Down Expand Up @@ -228,6 +229,7 @@ public Observable<VectorSchemaRoot> prepareQuery(String sql, List<Object> params
}
}
} catch (Exception e) {
LOGGER.error("",e);
emitter.onError(e);
return Future.failedFuture(e);
} finally {
Expand Down Expand Up @@ -460,6 +462,7 @@ updateCount, getLastInsertId(callableStatement)
}
return Future.succeededFuture(resultSetList);
} catch (Exception exception) {
LOGGER.error("",exception);
return Future.failedFuture(exception);
}
});
Expand Down Expand Up @@ -500,6 +503,7 @@ public synchronized Future<SqlResult> insert(String sql, List<Object> params) {
sqlResult.setLastInsertId(lastInsertId);
return Future.succeededFuture(sqlResult);
} catch (Exception e) {
LOGGER.error("",e);
return Future.failedFuture(e);
}
});
Expand Down Expand Up @@ -550,6 +554,7 @@ public synchronized Future<SqlResult> update(String sql, List<Object> params) {
sqlResult.setLastInsertId(lastInsertId);
return Future.succeededFuture(sqlResult);
} catch (Exception e) {
LOGGER.error("",e);
return Future.failedFuture(e);
}
});
Expand Down
3 changes: 1 addition & 2 deletions mycat2/src/main/java/io/mycat/vertx/VertxMycatServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ public int kill(List<Long> ids) {
public void addSession(VertxSession vertxSession) {
NetSocket socket = vertxSession.getSocket();
socket.closeHandler(event -> {
String message = "session:{} is closing:{}";
LOGGER.info(message, vertxSession);
LOGGER.info("session:{} is closing", vertxSession);
sessions.remove(vertxSession);
});
sessions.add(vertxSession);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@

package io.vertx.core.impl.future;

import cn.mycat.vertx.xa.impl.XaLogImpl;
import io.mycat.commands.VertxMySQLDatasourcePoolImpl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.NoStackTraceThrowable;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Objects;
Expand All @@ -29,7 +29,7 @@
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
class FutureImpl<T> extends FutureBase<T> {
private final static Logger LOGGER = LoggerFactory.getLogger(FutureImpl.class);
private static final Logger LOGGER = LoggerFactory.getLogger(VertxMySQLDatasourcePoolImpl.class);

private static final Object NULL_VALUE = new Object();

Expand Down
10 changes: 5 additions & 5 deletions va/src/main/java/cn/mycat/vertx/xa/impl/BaseXaSqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.util.*;
Expand All @@ -37,7 +37,7 @@


public class BaseXaSqlConnection extends AbstractXaSqlConnection {
private final static Logger LOGGER = LoggerFactory.getLogger(BaseXaSqlConnection.class);
private static final Logger LOGGER = LoggerFactory.getLogger(BaseXaSqlConnection.class);
protected final ConcurrentHashMap<String, NewMycatConnection> map = new ConcurrentHashMap<>();
protected final Map<NewMycatConnection, State> connectionState = Collections.synchronizedMap(new IdentityHashMap<>());
protected final List<NewMycatConnection> extraConnections = new CopyOnWriteArrayList<>();
Expand Down Expand Up @@ -171,7 +171,7 @@ public Future<Void> rollback() {
//@todo 注册调度中心,定时恢复
return Future.failedFuture(message);
} catch (Exception e) {
LOGGER.error(e);
LOGGER.error("",e);
return Future.failedFuture(e);
}
}).onComplete(promise);
Expand All @@ -191,7 +191,7 @@ private boolean tryRecovery(Set<String> targets) throws InterruptedException {
log.readXARecoveryLog(map);
return true;
} catch (Exception e) {
LOGGER.error(e);
LOGGER.error("",e);
} finally {
map.values().forEach(c -> {
if (c != null) {
Expand Down
12 changes: 7 additions & 5 deletions va/src/main/java/cn/mycat/vertx/xa/impl/LocalSqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
import io.mycat.newquery.NewMycatConnection;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.Handler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -99,7 +100,7 @@ public Future<Void> rollback() {
inTranscation = false;
//每一个记录日志
return Future.succeededFuture();
}).mapEmpty().flatMap(o -> closeStatementState());
}).onFailure(event -> LOGGER.error("",event)).mapEmpty().flatMap(o -> closeStatementState());
}

@Override
Expand All @@ -108,7 +109,7 @@ public Future<Void> commit() {
return CompositeFuture.join(rollback).onComplete(event -> {
inTranscation = false;
//每一个记录日志
}).mapEmpty().flatMap(o -> closeStatementState());
}).onFailure(event -> LOGGER.error("",event)).mapEmpty().flatMap(o -> closeStatementState());
}

@Override
Expand All @@ -127,7 +128,8 @@ public Future<Void> commitXa(Function<ImmutableCoordinatorLog, Future<Void>> bef
@Override
public Future<Void> close() {
Function<NewMycatConnection, Future<Void>> consumer = newMycatConnection -> {
return newMycatConnection.close();
newMycatConnection.abandonConnection();
return Future.succeededFuture();
};
return close(consumer);
}
Expand Down
28 changes: 19 additions & 9 deletions va/src/main/java/cn/mycat/vertx/xa/impl/LocalXaSqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import io.mycat.newquery.NewMycatConnection;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -120,7 +120,11 @@ public Future<Void> rollback() {
}
if (localSqlConnection != null && map.isEmpty()) {
inTranscation = false;
return localSqlConnection.update("rollback;").compose(unused -> localSqlConnection.close()).mapEmpty();
return localSqlConnection.update("rollback;").transform(unused -> {
LOGGER.error("", unused.cause());
localSqlConnection.abandonConnection();
return Future.succeededFuture();
}).mapEmpty();
}
String curXid = this.xid;
NewMycatConnection curLocalSqlConnection = this.localSqlConnection;
Expand All @@ -129,7 +133,11 @@ public Future<Void> rollback() {
this.targetName = null;
this.xid = null;
return curLocalSqlConnection.update("delete from mycat.xa_log where xid = '" + curXid + "'");
})).compose(u -> curLocalSqlConnection.close()).mapEmpty();
})).transform(u -> {
LOGGER.error("", u.cause());
curLocalSqlConnection.abandonConnection();
return Future.succeededFuture();
}).mapEmpty();
}

@Override
Expand Down Expand Up @@ -161,11 +169,12 @@ public Future<Void> close() {
if (localSqlConnection != null) {
return localSqlConnection
.update("rollback")
.flatMap(c -> localSqlConnection.close()
.onComplete(event1 -> {
localSqlConnection = null;
targetName = null;
}));
.onComplete(c -> {
LOGGER.error("", c.cause());
localSqlConnection.abandonConnection();
localSqlConnection = null;
targetName = null;
}).mapEmpty();

} else {
return Future.succeededFuture();
Expand Down Expand Up @@ -193,6 +202,7 @@ public Future<Void> kill() {
return future.flatMap(unused -> {
if (localSqlConnection != null) {
localSqlConnection.abandonConnection();
localSqlConnection = null;
}
return super.kill();
});
Expand Down

0 comments on commit 078c1f3

Please sign in to comment.