Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

column to column comparisons for filtering file scans and row data #11152

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ protected BoundPredicate(Operation op, BoundTerm<T> term) {
super(op, term);
}

protected BoundPredicate(Operation op, BoundTerm<T> term, BoundTerm<T> rightTerm) {
super(op, term, rightTerm);
}

public boolean test(StructLike struct) {
return test(term().eval(struct));
}
Expand Down Expand Up @@ -58,6 +62,14 @@ public BoundLiteralPredicate<T> asLiteralPredicate() {
throw new IllegalStateException("Not a literal predicate: " + this);
}

public boolean isTermPredicate() {
return false;
}

public BoundTermPredicate<T> asTermPredicate() {
throw new IllegalStateException("Not a term predicate: " + this);
}

public boolean isSetPredicate() {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.expressions;

import org.apache.iceberg.util.NaNUtil;

public class BoundTermPredicate<T> extends BoundPredicate<T> {

BoundTermPredicate(Operation op, BoundTerm<T> term, BoundTerm<T> rightTerm) {
super(op, term, rightTerm);
}

@Override
public Expression negate() {
return new BoundTermPredicate<>(op().negate(), term(), rightTerm());
}

@Override
public boolean isTermPredicate() {
return true;
}

@Override
public BoundTermPredicate<T> asTermPredicate() {
return this;
}

@Override
public boolean test(T value) { // TODO
switch (op()) {
case IS_NULL:
return value == null;
case NOT_NULL:
return value != null;
case IS_NAN:
return NaNUtil.isNaN(value);
case NOT_NAN:
return !NaNUtil.isNaN(value);
default:
throw new IllegalStateException("Invalid operation for BoundTermPredicate: " + op());
}
}

@Override
public boolean isEquivalentTo(Expression other) {
if (op() == other.op()) {
return term().isEquivalentTo(((BoundTermPredicate<?>) other).term());
}

return false;
}

@Override
public String toString() {
switch (op()) {
case LT:
return term() + " < " + rightTerm();
case LT_EQ:
return term() + " <= " + rightTerm();
case GT:
return term() + " > " + rightTerm();
case GT_EQ:
return term() + " >= " + rightTerm();
case EQ:
return term() + " == " + rightTerm();
case NOT_EQ:
return term() + " != " + rightTerm();
default:
return "Invalid term predicate: operation = " + op();
}
}
}
83 changes: 83 additions & 0 deletions api/src/main/java/org/apache/iceberg/expressions/Evaluator.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Set;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.expressions.ExpressionVisitors.BoundVisitor;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.NaNUtil;

Expand Down Expand Up @@ -107,35 +108,107 @@ public <T> Boolean lt(Bound<T> valueExpr, Literal<T> lit) {
return cmp.compare(valueExpr.eval(struct), lit.value()) < 0;
}

@Override
public <T> Boolean lt(Bound<T> valueExpr, Bound<T> valueExpr2) {
validateDataTypes(valueExpr, valueExpr2);
T value = valueExpr.eval(struct);
T value2 = valueExpr2.eval(struct);
if (value == null || value2 == null) {
return false;
}
Comparator<T> cmp = Comparators.forType(valueExpr.ref().type().asPrimitiveType());
return cmp.compare(value, value2) < 0;
}

@Override
public <T> Boolean ltEq(Bound<T> valueExpr, Literal<T> lit) {
Comparator<T> cmp = lit.comparator();
return cmp.compare(valueExpr.eval(struct), lit.value()) <= 0;
}

@Override
public <T> Boolean ltEq(Bound<T> valueExpr, Bound<T> valueExpr2) {
validateDataTypes(valueExpr, valueExpr2);
T value = valueExpr.eval(struct);
T value2 = valueExpr2.eval(struct);
if (value == null || value2 == null) {
return false;
}
Comparator<T> cmp = Comparators.forType(valueExpr.ref().type().asPrimitiveType());
return cmp.compare(value, value2) <= 0;
}

@Override
public <T> Boolean gt(Bound<T> valueExpr, Literal<T> lit) {
Comparator<T> cmp = lit.comparator();
return cmp.compare(valueExpr.eval(struct), lit.value()) > 0;
}

@Override
public <T> Boolean gt(Bound<T> valueExpr, Bound<T> valueExpr2) {
validateDataTypes(valueExpr, valueExpr2);
T value = valueExpr.eval(struct);
T value2 = valueExpr2.eval(struct);
if (value == null || value2 == null) {
return false;
}
Comparator<T> cmp = Comparators.forType(valueExpr.ref().type().asPrimitiveType());
return cmp.compare(value, value2) > 0;
}

@Override
public <T> Boolean gtEq(Bound<T> valueExpr, Literal<T> lit) {
Comparator<T> cmp = lit.comparator();
return cmp.compare(valueExpr.eval(struct), lit.value()) >= 0;
}

@Override
public <T> Boolean gtEq(Bound<T> valueExpr, Bound<T> valueExpr2) {
validateDataTypes(valueExpr, valueExpr2);
T value = valueExpr.eval(struct);
T value2 = valueExpr2.eval(struct);
if (value == null || value2 == null) {
return false;
}
Comparator<T> cmp = Comparators.forType(valueExpr.ref().type().asPrimitiveType());
return cmp.compare(value, value2) >= 0;
}

@Override
public <T> Boolean eq(Bound<T> valueExpr, Literal<T> lit) {
Comparator<T> cmp = lit.comparator();
return cmp.compare(valueExpr.eval(struct), lit.value()) == 0;
}

@Override
public <T> Boolean eq(Bound<T> valueExpr, Bound<T> valueExpr2) {
validateDataTypes(valueExpr, valueExpr2);
T value = valueExpr.eval(struct);
T value2 = valueExpr2.eval(struct);
if (value == null || value2 == null) {
return false;
}
Comparator<T> cmp = Comparators.forType(valueExpr.ref().type().asPrimitiveType());
return cmp.compare(value, value2) == 0;
}

@Override
public <T> Boolean notEq(Bound<T> valueExpr, Literal<T> lit) {
return !eq(valueExpr, lit);
}

@Override
public <T> Boolean notEq(Bound<T> valueExpr, Bound<T> valueExpr2) {
validateDataTypes(valueExpr, valueExpr2);
T value = valueExpr.eval(struct);
T value2 = valueExpr2.eval(struct);
if (value == null || value2 == null) {
return false;
}
Comparator<T> cmp = Comparators.forType(valueExpr.ref().type().asPrimitiveType());
return !(cmp.compare(value, value2) == 0);
}

@Override
public <T> Boolean in(Bound<T> valueExpr, Set<T> literalSet) {
return literalSet.contains(valueExpr.eval(struct));
Expand All @@ -156,5 +229,15 @@ public <T> Boolean startsWith(Bound<T> valueExpr, Literal<T> lit) {
public <T> Boolean notStartsWith(Bound<T> valueExpr, Literal<T> lit) {
return !startsWith(valueExpr, lit);
}

private <T> void validateDataTypes(Bound<T> valueExpr, Bound<T> valueExpr2) {
if (valueExpr.ref().type().typeId() != valueExpr2.ref().type().typeId()) {
throw new IllegalArgumentException(
"Cannot compare different types: "
+ valueExpr.ref().type()
+ " and "
+ valueExpr2.ref().type());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,9 @@ public <T> Expression predicate(BoundPredicate<T> pred) {
@Override
@SuppressWarnings("unchecked")
public <T> Expression predicate(UnboundPredicate<T> pred) {
if (pred.rightTerm() != null) {
return new UnboundPredicate<>(pred.op(), pred.term(), pred.rightTerm());
}
switch (pred.op()) {
case IS_NULL:
case NOT_NULL:
Expand Down Expand Up @@ -390,6 +393,9 @@ private String value(BoundLiteralPredicate<?> pred) {
@Override
public <T> String predicate(BoundPredicate<T> pred) {
String term = describe(pred.term());
if (pred.rightTerm() != null) {
return termPredicate(pred, term);
}
switch (pred.op()) {
case IS_NULL:
return term + " IS NULL";
Expand Down Expand Up @@ -439,9 +445,33 @@ public <T> String predicate(BoundPredicate<T> pred) {
}
}

private static String termPredicate(Predicate pred, String term) {
String rightTerm = describe(pred.rightTerm());
switch (pred.op()) {
case LT:
return term + " < " + rightTerm;
case LT_EQ:
return term + " <= " + rightTerm;
case GT:
return term + " > " + rightTerm;
case GT_EQ:
return term + " >= " + rightTerm;
case EQ:
return term + " = " + rightTerm;
case NOT_EQ:
return term + " != " + rightTerm;
default:
throw new UnsupportedOperationException(
"Cannot sanitize unsupported predicate type: " + pred.op());
}
}

@Override
public <T> String predicate(UnboundPredicate<T> pred) {
String term = describe(pred.term());
if (pred.rightTerm() != null) {
return termPredicate(pred, term);
}
switch (pred.op()) {
case IS_NULL:
return term + " IS NULL";
Expand Down
Loading