Skip to content

Commit

Permalink
Revert "fix bkajoin step 2"
Browse files Browse the repository at this point in the history
This reverts commit 9341c7c
  • Loading branch information
junwen12221 committed May 28, 2022
1 parent fe0964b commit c96bab0
Showing 1 changed file with 39 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.calcite.sql.*;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.util.SqlShuttle;
import org.apache.calcite.sql.util.SqlString;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
Expand All @@ -42,7 +41,6 @@
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static io.mycat.calcite.MycatImplementor.MYCAT_SQL_LOOKUP_IN;

Expand Down Expand Up @@ -250,9 +248,8 @@ public static Observable<Object[]> dispatchRightObservable(NewMycatDataContext c
if (argsList.isEmpty()) {
return Observable.empty();
}
JoinInfo joinInfo = JoinInfo.of(tableLookup.getInput(), tableLookup.getRight(), tableLookup.getJoinCondition());
List<String> rightFieldNames = RelOptUtil.findAllTables(rightView.getRelNode()).get(0).getRowType().getFieldNames();
RexShuttle rexShuttle = argSolver(joinInfo, argsList);
RexShuttle rexShuttle = argSolver(tableLookup.getInput(), argsList, rightFieldNames);
RelNode mycatInnerRelNode = rightView.getRelNode().accept(new RelShuttleImpl() {
@Override
public RelNode visit(RelNode other) {
Expand All @@ -279,7 +276,7 @@ public RelNode visit(RelNode other) {
}

@NotNull
private static RexShuttle argSolver(JoinInfo joinInfo, List<Object[]> argsList) {
private static RexShuttle argSolver(RelNode left, List<Object[]> argsList, List<String> rightFieldNames) {
return new RexShuttle() {
@Override
public void visitEach(Iterable<? extends RexNode> exprs) {
Expand All @@ -303,41 +300,54 @@ public RexNode visitDynamicParam(RexDynamicParam dynamicParam) {

@Override
public RexNode visitCall(RexCall call) {
RexBuilder rexBuilder = MycatCalciteSupport.RexBuilder;
if (call.getOperator() == MYCAT_SQL_LOOKUP_IN) {
RexBuilder rexBuilder = MycatCalciteSupport.RexBuilder;
List<RexNode> operands = call.getOperands();
RexCall exprRow = (RexCall) operands.get(0);
List<Integer> needColumnList = exprRow.getOperands().stream().map(i -> ((RexInputRef) i).getIndex()).collect(Collectors.toList());
List<Integer> needValueList = joinInfo.pairs().stream()
.filter(intPair -> needColumnList.contains(intPair.target))
.map(intPair -> intPair.target).collect(Collectors.toList());
if (needColumnList.size() != needValueList.size()){
throw new UnsupportedOperationException("may be a bug");
}
List<String> columnNames = exprRow.getOperands().stream().map(i -> ((RexInputRef) i).getIndex()).map(i -> rightFieldNames.get(i)).collect(Collectors.toList());
List<String> valueNames = left.getRowType().getFieldNames();
columnNames = columnNames.stream().filter(i -> valueNames.contains(i)).collect(Collectors.toList());
List<RexNode> collect = exprRow.getOperands().stream().filter(rexNode -> {
int index = ((RexInputRef) rexNode).getIndex();
return valueNames.contains( rightFieldNames.get(index));
}).collect(Collectors.toList());
exprRow = (RexCall) rexBuilder.makeCall(SqlStdOperatorTable.ROW, collect);
List<List<RexLiteral>> rowList = new ArrayList<>();
for (Object[] args : argsList) {
List<RexLiteral> row = needValueList.stream()
.map(i -> args[i])
.map(o -> {
if (o == null){
return (RexLiteral)MycatCalciteSupport.RexBuilder.makeNullLiteral(SqlTypeName.VARCHAR);
}
return MycatCalciteSupport.RexBuilder.makeLiteral(Optional.of(o).map(o1 -> {
if (o1 instanceof LocalDateTime) {
return java.sql.Timestamp.valueOf((LocalDateTime) o1).toString();
List<RexLiteral> row = new ArrayList<>();
for (int i = 0; i < columnNames.size(); i++) {
String needColumnName = columnNames.get(i);


for (int j = 0; j < valueNames.size(); j++) {
String fieldName = valueNames.get(j);
if (needColumnName.equals(fieldName)) {
RelDataTypeField relDataTypeField = left.getRowType().getFieldList().get(j);
RelDataType fieldType = relDataTypeField.getType();
RexLiteral right = (RexLiteral) MycatCalciteSupport.RexBuilder.makeLiteral(Optional.ofNullable(args[j]).map(new Function<Object, String>() {
@Override
public String apply(Object o) {
if (o instanceof LocalDateTime) {
return java.sql.Timestamp.valueOf((LocalDateTime) o).toString();
}
return o.toString();
}
return o1.toString();
}).orElse(null));
})
.collect(Collectors.toList());
row.add(right);
}
}
}
rowList.add(row);
}
if (rowList.get(0).size() != columnNames.size()) {
throw new UnsupportedOperationException("may be a bug");
}

if (rowList.size() == 1) {
ArrayList<RexNode> ands = new ArrayList<>();
List<RexNode> exprs = exprRow.getOperands();
for (int i = 0; i < needColumnList.size(); i++) {
RexNode left = exprs.get(i);
RexNode right = rowList.get(0).get(needColumnList.get(i));
for (int i = 0; i < exprRow.getOperands().size(); i++) {
RexNode right = rowList.get(0).get(i);
RexNode left = exprRow.getOperands().get(i);
ands.add(MycatCalciteSupport.RexBuilder
.makeCall(SqlStdOperatorTable.EQUALS, left, MycatCalciteSupport.RexBuilder.makeCast(left.getType(), right)));
}
Expand Down

0 comments on commit c96bab0

Please sign in to comment.