Skip to content

Commit

Permalink
Merge qa test to integration test (NVIDIA#172)
Browse files Browse the repository at this point in the history
* merge qa test to integration test

* add run control for qa test and update sql

* Update build script for qatest
  • Loading branch information
shotai authored Jun 16, 2020
1 parent 4272714 commit 63beed2
Show file tree
Hide file tree
Showing 8 changed files with 1,015 additions and 5 deletions.
3 changes: 2 additions & 1 deletion integration_tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@
<environmentVariables>
<SKIP_TESTS>${skipTests}</SKIP_TESTS>
<TEST>${test}</TEST>
<COVERAGE_SUBMIT_FLAGS>${argLine}</COVERAGE_SUBMIT_FLAGS>
<COVERAGE_SUBMIT_FLAGS>${argLine}</COVERAGE_SUBMIT_FLAGS>
<TEST_TAGS>${pytest.TEST_TAGS}</TEST_TAGS>
</environmentVariables>
</configuration>
</execution>
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ markers =
ignore_order(local): Ignores the order of the result in asserts. If local is true the results are sorted in python instead of using spark.
incompat: Enable incompat operators
limit(num_rows): Limit the number of rows that will be check in a result
qarun: Mark qa test

6 changes: 5 additions & 1 deletion integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,9 @@ else
then
TEST_ARGS="-k $TEST"
fi
"$SPARK_HOME"/bin/spark-submit --jars "${ALL_JARS// /,}" --conf "spark.driver.extraJavaOptions=-Duser.timezone=GMT $COVERAGE_SUBMIT_FLAGS" --conf 'spark.executor.extraJavaOptions=-Duser.timezone=GMT' --conf 'spark.sql.session.timeZone=UTC' --conf 'spark.sql.shuffle.partitions=12' $SPARK_SUBMIT_FLAGS ./runtests.py -v -rfExXs --std_input_path=./src/test/resources/ "$TEST_ARGS" $RUN_TEST_PARAMS "$@"
if [[ "${TEST_TAGS}" != "" ]];
then
TEST_TAGS="-m $TEST_TAGS"
fi
"$SPARK_HOME"/bin/spark-submit --jars "${ALL_JARS// /,}" --conf "spark.driver.extraJavaOptions=-Duser.timezone=GMT $COVERAGE_SUBMIT_FLAGS" --conf 'spark.executor.extraJavaOptions=-Duser.timezone=GMT' --conf 'spark.sql.session.timeZone=UTC' --conf 'spark.sql.shuffle.partitions=12' $SPARK_SUBMIT_FLAGS ./runtests.py -v -rfExXs "$TEST_TAGS" --std_input_path=./src/test/resources/ "$TEST_ARGS" $RUN_TEST_PARAMS "$@"
fi
2 changes: 1 addition & 1 deletion integration_tests/src/main/python/marks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@
ignore_order = pytest.mark.ignore_order
incompat = pytest.mark.incompat
limit = pytest.mark.limit

qarun = pytest.mark.qarun
194 changes: 194 additions & 0 deletions integration_tests/src/main/python/qa_nightly_select_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext, SQLContext
import datetime
from argparse import ArgumentParser
from decimal import Decimal
from asserts import assert_gpu_and_cpu_are_equal_collect
from qa_nightly_sql import *
import pytest
from spark_session import spark as s
from marks import approximate_float, ignore_order, incompat, qarun

def num_stringDf(spark):
print("### CREATE DATAFRAME 1 ####")
schema = StructType([StructField("strF", StringType()),
StructField("byteF", ByteType()),
StructField("shortF", ShortType()),
StructField("intF", IntegerType()),
StructField("longF", LongType()),
StructField("floatF", FloatType()),
StructField("doubleF", DoubleType()),
StructField("decimalF", DoubleType()),
StructField("booleanF", BooleanType()),
StructField("timestampF", TimestampType()),
StructField("dateF", DateType())])
dt = datetime.date(1990, 1, 1)
print(dt)
tm = datetime.datetime(2020,2,1,12,1,1)

data = [("FIRST", None, 500, 1200, 10, 10.001, 10.0003, 1.01, True, tm, dt),
("sold out", 20, 600, None, 20, 20.12, 2.000013, 2.01, True, tm, dt),
("take out", 20, 600, None, 20, 20.12, 2.000013, 2.01, True, tm, dt),
("Yuan", 20, 600, 2200, None, 20.12, 2.000013, 2.01, False, tm, dt),
("Alex", 30, 700, 3200, 30, None, 3.000013, 2.01, True, None, dt),
("Satish", 30, 700, 3200, 30, 30.12, None, 3.01, False, tm, dt),
("Gary", 40, 800, 4200, 40, 40.12, 4.000013, None, False, tm, dt),
("NVIDIA", 40, 800, 4200, -40, 40.12, 4.00013, 4.01, None, tm, dt),
("Mellanox", 40, 800, 4200, -20, -20.12, 4.00013, 4.01, False,None, dt),
(None, 30, 500, -3200, -20, 2.012, 4.000013, -4.01, False, tm, None),
("NVIDIASPARKTEAM", 0, 500, -3200, -20, 2.012, 4.000013, -4.01, False, tm, dt),
("NVIDIASPARKTEAM", 20, 0, -3200, -20, 2.012, 4.000013, -4.01, False, tm, dt),
("NVIDIASPARKTEAM", 0, 50, 0, -20, 2.012, 4.000013, -4.01, False, tm, dt),
(None, 0, 500, -3200, 0, 0.0, 0.0, -4.01, False, tm, dt),
("phuoc", 30, 500, 3200, -20, 20.12, 4.000013, 4.01, False, tm, dt)]
df = spark.createDataFrame(data,schema=schema)
df.createOrReplaceTempView("test_table")


# create dataframe for join & union operation testing
def num_stringDf_two(spark):
print("### CREATE DATAFRAME TWO ####")
schema = StructType([StructField("strF", StringType()),
StructField("byteF", ByteType()),
StructField("shortF", ShortType()),
StructField("intF", IntegerType()),
StructField("longF", LongType()),
StructField("floatF", FloatType()),
StructField("doubleF", DoubleType()),
StructField("decimalF", DoubleType()),
StructField("booleanF", BooleanType()),
StructField("timestampF", TimestampType()),
StructField("dateF", DateType())])

dt = datetime.date(2000, 1, 1)
print(dt)
tm = datetime.datetime(2022,12,1,12,1,1)
data = [("AL", 10, 500, 1200, 10, 10.001, 10.0003, 1.01, True, tm, dt),
("Jhon", 20, 600, 2200, 20, 20.12, 2.000013, 2.01, True, tm, dt),
("Alex", 30, 700, 3200, 30, 30.12, 3.000013, 3.01, True, tm, dt),
("Satish", 30, 700, 3200, 30, 30.12, 3.000013, 3.01, False, tm, dt),
("Kary", 40, 800, 4200, 40, 40.12, 4.000013, 4.01, False, tm, dt),
(None, 40, 800, 4200, -40, 40.12, 4.00013, 4.01, False, tm, dt),
(None, 40, 800, 4200, -20, -20.12, 4.00013, 4.01, False, tm, dt),
(None, 30, 500, -3200, -20, 2.012, 4.000013, -4.01, False, tm, dt),
("phuoc", 30, 500, 3200, -20, 20.12, 4.000013, 4.01, False, tm, dt)]

df = spark.createDataFrame(data, schema=schema)
df.createOrReplaceTempView("test_table1")

def num_stringDf_first_last(spark, field_name):
print("### CREATE DATAFRAME 1 ####")
schema = StructType([StructField("strF", StringType()),
StructField("byteF", ByteType()),
StructField("shortF", ShortType()),
StructField("intF", IntegerType()),
StructField("longF", LongType()),
StructField("floatF", FloatType()),
StructField("doubleF", DoubleType()),
StructField("decimalF", DoubleType()),
StructField("booleanF", BooleanType()),
StructField("timestampF", TimestampType()),
StructField("dateF", DateType())])
dt = datetime.date(1990, 1, 1)
print(dt)
tm = datetime.datetime(2020,2,1,12,1,1)

data = [("FIRST", None, 500, 1200, 10, 10.001, 10.0003, 1.01, True, tm, dt),
("sold out", 20, 600, None, 20, 20.12, 2.000013, 2.01, True, tm, dt),
("take out", 20, 600, None, 20, 20.12, 2.000013, 2.01, True, tm, dt),
("Yuan", 20, 600, 2200, None, 20.12, 2.000013, 2.01, False, tm, dt),
("Alex", 30, 700, 3200, 30, None, 3.000013, 2.01, True, None, dt),
("Satish", 30, 700, 3200, 30, 30.12, None, 3.01, False, tm, dt),
("Gary", 40, 800, 4200, 40, 40.12, 4.000013, None, False, tm, dt),
("NVIDIA", 40, 800, 4200, -40, 40.12, 4.00013, 4.01, None, tm, dt),
("Mellanox", 40, 800, 4200, -20, -20.12, 4.00013, 4.01, False,None, dt),
(None, 30, 500, -3200, -20, 2.012, 4.000013, -4.01, False, tm, None),
("NVIDIASPARKTEAM", 0, 500, -3200, -20, 2.012, 4.000013, -4.01, False, tm, dt),
("NVIDIASPARKTEAM", 20, 0, -3200, -20, 2.012, 4.000013, -4.01, False, tm, dt),
("NVIDIASPARKTEAM", 0, 50, 0, -20, 2.012, 4.000013, -4.01, False, tm, dt),
(None, 0, 500, -3200, 0, 0.0, 0.0, -4.01, False, tm, dt),
("phuoc", 30, 500, 3200, -20, 20.12, 4.000013, 4.01, False, tm, dt)]
df = spark.createDataFrame(data,schema=schema).repartition(1).orderBy(field_name)
df.createOrReplaceTempView("test_table")

def idfn(val):
return val[1]

_qa_conf = {
'spark.rapids.sql.variableFloatAgg.enabled': 'true',
'spark.rapids.sql.hasNans': 'false',
'spark.rapids.sql.castStringToFloat.enabled': 'true',
'spark.rapids.sql.castFloatToString.enabled': 'true',
'spark.rapids.sql.expression.InitCap': 'true',
'spark.rapids.sql.expression.Lower': 'true',
'spark.rapids.sql.expression.Upper': 'true',
'spark.rapids.sql.expression.UnixTimestamp': 'true',
}



@approximate_float
@incompat
@ignore_order
@qarun
@pytest.mark.parametrize('sql_query_line', SELECT_SQL, ids=idfn)
def test_select(sql_query_line, pytestconfig):
sql_query = sql_query_line[0]
if sql_query:
print(sql_query)
num_stringDf(s)
assert_gpu_and_cpu_are_equal_collect(lambda spark: spark.sql(sql_query), conf=_qa_conf)

@approximate_float
@incompat
@ignore_order("local")
@qarun
@pytest.mark.parametrize('sql_query_line', SELECT_JOIN_SQL, ids=idfn)
def test_select_join(sql_query_line, pytestconfig):
sql_query = sql_query_line[0]
if sql_query:
print(sql_query)
num_stringDf(s)
if ("UNION" in sql_query) or ("JOIN" in sql_query):
num_stringDf_two(s)
assert_gpu_and_cpu_are_equal_collect(lambda spark: spark.sql(sql_query), conf=_qa_conf)

@approximate_float
@incompat
@ignore_order("local")
@qarun
@pytest.mark.parametrize('sql_query_line', SELECT_PRE_ORDER_SQL, ids=idfn)
def test_select_first_last(sql_query_line, pytestconfig):
sql_query = sql_query_line[0]
if sql_query:
print(sql_query)
num_stringDf_first_last(s, sql_query_line[2])
assert_gpu_and_cpu_are_equal_collect(lambda spark: spark.sql(sql_query).orderBy('res'), conf=_qa_conf)

@approximate_float(abs=1e-6)
@incompat
@ignore_order("local")
@qarun
@pytest.mark.parametrize('sql_query_line', SELECT_FLOAT_SQL, ids=idfn)
def test_select_float_order_local(sql_query_line, pytestconfig):
sql_query = sql_query_line[0]
if sql_query:
print(sql_query)
num_stringDf(s)
assert_gpu_and_cpu_are_equal_collect(lambda spark: spark.sql(sql_query), conf=_qa_conf)

Loading

0 comments on commit 63beed2

Please sign in to comment.