Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: support mysql update join sql #4914

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,17 @@ protected String getColumnNameInSQL(String columnName) {
return tableAlias == null ? columnName : tableAlias + "." + columnName;
}

/**
* Gets column name in sql.
*
* @param tableAlias the tableAlias
* @param columnName the column name
* @return the column name in sql
*/
protected String getColumnNameInSQL(String tableAlias,String columnName) {
return tableAlias == null ? columnName : tableAlias + "." + columnName;
}

/**
* Gets several column name in sql.
*
Expand All @@ -225,6 +236,27 @@ protected String getColumnNamesInSQL(List<String> columnNameList) {
return columnNamesStr.toString();
}

/**
* Gets several column name in sql.
*
* @param tableAlias the table alias
* @param columnNameList the column name
* @return the column name in sql
*/
protected String getColumnNamesInSQL(String tableAlias,List<String> columnNameList) {
if (Objects.isNull(columnNameList) || columnNameList.isEmpty()) {
slievrly marked this conversation as resolved.
Show resolved Hide resolved
return null;
}
StringBuilder columnNamesStr = new StringBuilder();
for (int i = 0; i < columnNameList.size(); i++) {
if (i > 0) {
columnNamesStr.append(" , ");
}
columnNamesStr.append(getColumnNameInSQL(tableAlias,columnNameList.get(i)));
}
return columnNamesStr.toString();
slievrly marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Gets from table in sql.
*
Expand Down Expand Up @@ -275,6 +307,21 @@ protected boolean containsPK(List<String> columns) {
return getTableMeta().containsPK(newColumns);
}

/**
* the columns contains table meta pk
*
* @param tableName the tableName
* @param columns the column name list
* @return true: contains pk false: not contains pk
*/
protected boolean containsPK(String tableName,List<String> columns) {
if (columns == null || columns.isEmpty()) {
slievrly marked this conversation as resolved.
Show resolved Hide resolved
slievrly marked this conversation as resolved.
Show resolved Hide resolved
return false;
}
List<String> newColumns = ColumnUtils.delEscape(columns, getDbType());
return getTableMeta(tableName).containsPK(newColumns);
}


/**
* compare column name and primary key name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import io.seata.core.model.BranchType;
import io.seata.rm.datasource.StatementProxy;
import io.seata.rm.datasource.exec.mysql.MySQLInsertOnDuplicateUpdateExecutor;
import io.seata.rm.datasource.exec.mysql.MySQLUpdateJoinExecutor;
import io.seata.rm.datasource.sql.SQLVisitorFactory;
import io.seata.sqlparser.SQLRecognizer;
import io.seata.sqlparser.SQLType;
import io.seata.sqlparser.util.JdbcConstants;

/**
Expand Down Expand Up @@ -113,6 +115,15 @@ public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecogniz
throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
}
break;
case UPDATE_JOIN:
switch (dbType) {
case JdbcConstants.MYSQL:
executor = new MySQLUpdateJoinExecutor<>(statementProxy,statementCallback,sqlRecognizer);
break;
default:
throw new NotSupportYetException(dbType + " not support to " + SQLType.UPDATE_JOIN.name());
}
break;
default:
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecu
private static final Configuration CONFIG = ConfigurationFactory.getInstance();

private static final boolean ONLY_CARE_UPDATE_COLUMNS = CONFIG.getBoolean(
ConfigurationKeys.TRANSACTION_UNDO_ONLY_CARE_UPDATE_COLUMNS, DefaultValues.DEFAULT_ONLY_CARE_UPDATE_COLUMNS);
ConfigurationKeys.TRANSACTION_UNDO_ONLY_CARE_UPDATE_COLUMNS, DefaultValues.DEFAULT_ONLY_CARE_UPDATE_COLUMNS);

/**
* Instantiates a new Update executor.
Expand All @@ -59,7 +59,7 @@ public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecu
* @param sqlRecognizer the sql recognizer
*/
public UpdateExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,
SQLRecognizer sqlRecognizer) {
SQLRecognizer sqlRecognizer) {
super(statementProxy, statementCallback, sqlRecognizer);
}

Expand All @@ -73,7 +73,6 @@ protected TableRecords beforeImage() throws SQLException {

private String buildBeforeImageSQL(TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
List<String> updateColumns = recognizer.getUpdateColumns();
StringBuilder prefix = new StringBuilder("SELECT ");
StringBuilder suffix = new StringBuilder(" FROM ").append(getFromTableInSQL());
String whereCondition = buildWhereCondition(recognizer, paramAppenderList);
Expand All @@ -90,24 +89,9 @@ private String buildBeforeImageSQL(TableMeta tableMeta, ArrayList<List<Object>>
}
suffix.append(" FOR UPDATE");
StringJoiner selectSQLJoin = new StringJoiner(", ", prefix.toString(), suffix.toString());
if (ONLY_CARE_UPDATE_COLUMNS) {
if (!containsPK(updateColumns)) {
selectSQLJoin.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType())));
}
for (String columnName : updateColumns) {
selectSQLJoin.add(columnName);
}

// The on update xxx columns will be auto update by db, so it's also the actually updated columns
List<String> onUpdateColumns = tableMeta.getOnUpdateColumnsOnlyName();
onUpdateColumns.removeAll(updateColumns);
for (String onUpdateColumn : onUpdateColumns) {
selectSQLJoin.add(ColumnUtils.addEscape(onUpdateColumn, getDbType()));
}
} else {
for (String columnName : tableMeta.getAllColumns().keySet()) {
selectSQLJoin.add(ColumnUtils.addEscape(columnName, getDbType()));
}
List<String> needUpdateColumns = getNeedUpdateColumns(tableMeta.getTableName(), sqlRecognizer.getTableAlias(), recognizer.getUpdateColumns());
for (String needUpdateColumn : needUpdateColumns) {
selectSQLJoin.add(needUpdateColumn);
}
return selectSQLJoin.toString();
}
Expand All @@ -134,28 +118,36 @@ private String buildAfterImageSQL(TableMeta tableMeta, TableRecords beforeImage)
String whereSql = SqlGenerateUtils.buildWhereConditionByPKs(tableMeta.getPrimaryKeyOnlyName(), beforeImage.pkRows().size(), getDbType());
String suffix = " FROM " + getFromTableInSQL() + " WHERE " + whereSql;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FROM WHERE 是否可以考虑定义成静态常量来使用?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FROM WHERE 是否可以考虑定义成静态常量来使用?

这个pr先不要定义了,刘洋有个pr专门搞这个的 @doubleDimple

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这块已有的常量比如where可以直接使用,没有的就先不要专门弄了

StringJoiner selectSQLJoiner = new StringJoiner(", ", prefix.toString(), suffix);
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
List<String> needUpdateColumns = getNeedUpdateColumns(tableMeta.getTableName(), sqlRecognizer.getTableAlias(), recognizer.getUpdateColumns());
for (String needUpdateColumn : needUpdateColumns) {
selectSQLJoiner.add(needUpdateColumn);
}
return selectSQLJoiner.toString();
}

protected List<String> getNeedUpdateColumns(String table, String tableAlias, List<String> originUpdateColumns) {
List<String> needUpdateColumns = new ArrayList<>();
slievrly marked this conversation as resolved.
Show resolved Hide resolved
TableMeta tableMeta = getTableMeta(table);
if (ONLY_CARE_UPDATE_COLUMNS) {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
List<String> updateColumns = recognizer.getUpdateColumns();
if (!containsPK(updateColumns)) {
selectSQLJoiner.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType())));
if (!containsPK(table, originUpdateColumns)) {
needUpdateColumns.add(getColumnNamesInSQL(tableAlias, tableMeta.getEscapePkNameList(getDbType())));
}
for (String columnName : updateColumns) {
selectSQLJoiner.add(columnName);
for (String columnName : originUpdateColumns) {
needUpdateColumns.add(columnName);
}

// The on update xxx columns will be auto update by db, so it's also the actually updated columns
List<String> onUpdateColumns = tableMeta.getOnUpdateColumnsOnlyName();
onUpdateColumns.removeAll(updateColumns);
onUpdateColumns.removeAll(originUpdateColumns);
for (String onUpdateColumn : onUpdateColumns) {
selectSQLJoiner.add(ColumnUtils.addEscape(onUpdateColumn, getDbType()));
needUpdateColumns.add(ColumnUtils.addEscape(onUpdateColumn, getDbType()));
}
} else {
for (String columnName : tableMeta.getAllColumns().keySet()) {
selectSQLJoiner.add(ColumnUtils.addEscape(columnName, getDbType()));
needUpdateColumns.add(ColumnUtils.addEscape(columnName, getDbType()));
}
}
return selectSQLJoiner.toString();
return needUpdateColumns;
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.rm.datasource.exec.mysql;

import io.seata.common.exception.ShouldNeverHappenException;
import io.seata.common.util.IOUtil;
import io.seata.common.util.StringUtils;
import io.seata.rm.datasource.SqlGenerateUtils;
import io.seata.rm.datasource.StatementProxy;
import io.seata.rm.datasource.exec.StatementCallback;
import io.seata.rm.datasource.exec.UpdateExecutor;
import io.seata.rm.datasource.sql.struct.TableMeta;
import io.seata.rm.datasource.sql.struct.TableRecords;
import io.seata.sqlparser.SQLRecognizer;
import io.seata.sqlparser.SQLUpdateRecognizer;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;

slievrly marked this conversation as resolved.
Show resolved Hide resolved
/**
* @author renliangyu857
*/
public class MySQLUpdateJoinExecutor<T, S extends Statement> extends UpdateExecutor<T, S> {
private final Map<String, TableRecords> beforeImagesMap = new LinkedHashMap<>(4);
private final Map<String, TableRecords> afterImagesMap = new LinkedHashMap<>(4);

/**
* Instantiates a new Update executor.
*
* @param statementProxy the statement proxy
* @param statementCallback the statement callback
* @param sqlRecognizer the sql recognizer
*/
public MySQLUpdateJoinExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,
SQLRecognizer sqlRecognizer) {
super(statementProxy, statementCallback, sqlRecognizer);
}

@Override
protected TableRecords beforeImage() throws SQLException {
ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
String tableNames = recognizer.getTableName();
if (StringUtils.isEmpty(tableNames)) {
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw Exceptions with an explicit meaning, otherwise it will result in NPE with an unclear meaning

}
// update join sql,like update t1 inner join t2 on t1.id = t2.id set t1.name = ?; tableItems = {"update t1 inner join t2","t1","t2"}
String[] tableItems = tableNames.split(recognizer.MULTI_TABLE_NAME_SEPERATOR);
String joinTable = tableItems[0];
int itemTableIndex = 1;
for (int i = itemTableIndex; i < tableItems.length; i++) {
String selectSQL = buildBeforeImageSQL(joinTable, tableItems[i], paramAppenderList);
TableRecords tableRecords = buildTableRecords(getTableMeta(tableItems[i]), selectSQL, paramAppenderList);
beforeImagesMap.put(tableItems[i], tableRecords);
}
return null;
slievrly marked this conversation as resolved.
Show resolved Hide resolved
}

private String buildBeforeImageSQL(String joinTable, String itemTable, ArrayList<List<Object>> paramAppenderList) {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
StringBuilder prefix = new StringBuilder("SELECT ");
StringBuilder suffix = new StringBuilder(" FROM ").append(joinTable);
slievrly marked this conversation as resolved.
Show resolved Hide resolved
String whereCondition = buildWhereCondition(recognizer, paramAppenderList);
String orderByCondition = buildOrderCondition(recognizer, paramAppenderList);
String limitCondition = buildLimitCondition(recognizer, paramAppenderList);
if (StringUtils.isNotBlank(whereCondition)) {
suffix.append(WHERE).append(whereCondition);
}
if (StringUtils.isNotBlank(orderByCondition)) {
suffix.append(" ").append(orderByCondition);
}
if (StringUtils.isNotBlank(limitCondition)) {
suffix.append(" ").append(limitCondition);
}
suffix.append(" FOR UPDATE");
StringJoiner selectSQLJoin = new StringJoiner(", ", prefix.toString(), suffix.toString());
TableMeta itemTableMeta = this.getTableMeta(itemTable);
List<String> itemTableUpdateColumns = getItemUpdateColumns(itemTableMeta.getAllColumns().keySet(), recognizer.getUpdateColumns());
List<String> needUpdateColumns = getNeedUpdateColumns(itemTable, recognizer.getTableAlias(itemTable), itemTableUpdateColumns);
for (String needUpdateColumn : needUpdateColumns) {
selectSQLJoin.add(needUpdateColumn);
}
return selectSQLJoin.toString();
}

@Override
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
String tableNames = recognizer.getTableName();
if (StringUtils.isEmpty(tableNames)) {
return null;
}
String[] tableItems = tableNames.split(recognizer.MULTI_TABLE_NAME_SEPERATOR);
String joinTable = tableItems[0];
int itemTableIndex = 1;
for (int i = itemTableIndex; i < tableItems.length; i++) {
TableRecords tableBeforeImage = beforeImagesMap.get(tableItems[i]);
String selectSQL = buildAfterImageSQL(joinTable, tableItems[i], tableBeforeImage);
ResultSet rs = null;
try (PreparedStatement pst = statementProxy.getConnection().prepareStatement(selectSQL)) {
SqlGenerateUtils.setParamForPk(tableBeforeImage.pkRows(), getTableMeta(tableItems[i]).getPrimaryKeyOnlyName(), pst);
rs = pst.executeQuery();
TableRecords afterImage = TableRecords.buildRecords(getTableMeta(tableItems[i]), rs);
afterImagesMap.put(tableItems[i], afterImage);
} finally {
IOUtil.close(rs);
}
}
return null;
}

private String buildAfterImageSQL(String joinTable, String itemTable,
TableRecords beforeImage) throws SQLException {
TableMeta itemTableMeta = getTableMeta(itemTable);
StringBuilder prefix = new StringBuilder("SELECT ");
String whereSql = SqlGenerateUtils.buildWhereConditionByPKs(itemTableMeta.getPrimaryKeyOnlyName(), beforeImage.pkRows().size(), getDbType());
String suffix = " FROM " + joinTable + " WHERE " + whereSql;
StringJoiner selectSQLJoiner = new StringJoiner(", ", prefix.toString(), suffix);
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
List<String> itemTableUpdateColumns = getItemUpdateColumns(itemTableMeta.getAllColumns().keySet(), recognizer.getUpdateColumns());
List<String> needUpdateColumns = getNeedUpdateColumns(itemTable, recognizer.getTableAlias(itemTable), itemTableUpdateColumns);
for (String needUpdateColumn : needUpdateColumns) {
selectSQLJoiner.add(needUpdateColumn);
}
return selectSQLJoiner.toString();
}

private List<String> getItemUpdateColumns(Set<String> itemAllColumns, List<String> updateColumns) {
List<String> itemUpdateColumns = new ArrayList<>();
for (String updateColumn : updateColumns) {
if (itemAllColumns.contains(updateColumn)) {
itemUpdateColumns.add(updateColumn);
}
}
return itemUpdateColumns;
}

@Override
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
if (beforeImagesMap == null || afterImagesMap == null) {
throw new IllegalStateException("images can not be null");
}
for (Map.Entry<String, TableRecords> entry : beforeImagesMap.entrySet()) {
String tableName = entry.getKey();
TableRecords tableBeforeImage = entry.getValue();
TableRecords tableAfterImage = afterImagesMap.get(tableName);
if (tableBeforeImage.getRows().size() != tableAfterImage.getRows().size()) {
throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");
}
super.prepareUndoLog(tableBeforeImage, tableAfterImage);
}
}
}
Loading