From 85ec98fd674d11b091792a27ab5b39e3720c8094 Mon Sep 17 00:00:00 2001 From: Charles Givre Date: Sun, 16 May 2021 09:00:23 -0400 Subject: [PATCH] Kafka tests working --- .../store/kafka/KafkaFilterPushdownTest.java | 225 +++++++++--------- .../exec/store/kafka/KafkaQueriesTest.java | 13 +- .../drill/exec/store/kafka/KafkaTestBase.java | 2 +- 3 files changed, 116 insertions(+), 124 deletions(-) diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java index ee4ceebf376..39b1049cac4 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java @@ -19,7 +19,6 @@ import org.apache.drill.categories.KafkaStorageTest; import org.apache.drill.categories.SlowTest; -import org.apache.drill.exec.physical.rowSet.RowSet; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.BeforeClass; import org.junit.Test; @@ -39,7 +38,7 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { public static void setup() throws Exception { TestKafkaSuit.createTopicHelper(TestQueryConstants.JSON_PUSHDOWN_TOPIC, NUM_PARTITIONS); KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), - StringSerializer.class); + StringSerializer.class); generator.populateJsonMsgWithTimestamps(TestQueryConstants.JSON_PUSHDOWN_TOPIC, NUM_JSON_MSG); String query = String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.JSON_PUSHDOWN_TOPIC); //Ensure messages are present @@ -56,14 +55,14 @@ public void testPushdownOnOffset() throws Exception { final int expectedRowCount = 5; //1 * NUM_PARTITIONS final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_AND, - TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2); + TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2); runKafkaSQLVerifyCount(queryString, expectedRowCount); queryBuilder() - .sql(queryString) - .jsonPlanMatcher() - .include(String.format(EXPECTED_PATTERN, expectedRowCount)) - .match(); + .sql(queryString) + .jsonPlanMatcher() + .include(String.format(EXPECTED_PATTERN, expectedRowCount)) + .match(); } /** @@ -75,14 +74,14 @@ public void testPushdownOnPartition() throws Exception { final int expectedRowCount = NUM_JSON_MSG; final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, - TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate); + TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate); runKafkaSQLVerifyCount(queryString, expectedRowCount); queryBuilder() - .sql(queryString) - .jsonPlanMatcher() - .include(String.format(EXPECTED_PATTERN, expectedRowCount)) - .match(); + .sql(queryString) + .jsonPlanMatcher() + .include(String.format(EXPECTED_PATTERN, expectedRowCount)) + .match(); } /** @@ -94,14 +93,14 @@ public void testPushdownOnTimestamp() throws Exception { final int expectedRowCount = 20; final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, - TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate); + TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate); runKafkaSQLVerifyCount(queryString,expectedRowCount); queryBuilder() - .sql(queryString) - .jsonPlanMatcher() - .include(String.format(EXPECTED_PATTERN, expectedRowCount)) - .match(); + .sql(queryString) + .jsonPlanMatcher() + .include(String.format(EXPECTED_PATTERN, expectedRowCount)) + .match(); } /** @@ -114,14 +113,14 @@ public void testPushdownUnorderedTimestamp() throws Exception { final int expectedRowCount = 5; final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, - TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate); + TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate); runKafkaSQLVerifyCount(queryString,expectedRowCount); queryBuilder() - .sql(queryString) - .jsonPlanMatcher() - .include(String.format(EXPECTED_PATTERN, expectedRowInPlan)) - .match(); + .sql(queryString) + .jsonPlanMatcher() + .include(String.format(EXPECTED_PATTERN, expectedRowInPlan)) + .match(); } /** @@ -133,14 +132,14 @@ public void testPushdownWhenTimestampDoesNotExist() throws Exception { final int expectedRowCount = 0; final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, - TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate); + TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate); runKafkaSQLVerifyCount(queryString,expectedRowCount); queryBuilder() - .sql(queryString) - .jsonPlanMatcher() - .include(String.format(EXPECTED_PATTERN, expectedRowCount)) - .match(); + .sql(queryString) + .jsonPlanMatcher() + .include(String.format(EXPECTED_PATTERN, expectedRowCount)) + .match(); } /** @@ -152,14 +151,14 @@ public void testPushdownWhenPartitionDoesNotExist() throws Exception { final int expectedRowCount = 0; final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, - TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate); + TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate); runKafkaSQLVerifyCount(queryString,expectedRowCount); queryBuilder() - .sql(queryString) - .jsonPlanMatcher() - .include(String.format(EXPECTED_PATTERN, expectedRowCount)) - .match(); + .sql(queryString) + .jsonPlanMatcher() + .include(String.format(EXPECTED_PATTERN, expectedRowCount)) + .match(); } /** @@ -172,14 +171,14 @@ public void testPushdownForEmptyScanSpec() throws Exception { final int expectedRowCount = 0; final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_AND, - TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2); + TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2); runKafkaSQLVerifyCount(queryString,expectedRowCount); queryBuilder() - .sql(queryString) - .jsonPlanMatcher() - .include(String.format(EXPECTED_PATTERN, expectedRowCount)) - .match(); + .sql(queryString) + .jsonPlanMatcher() + .include(String.format(EXPECTED_PATTERN, expectedRowCount)) + .match(); } /** @@ -192,69 +191,69 @@ public void testPushdownOffsetNoRecordsReturnedWithBoundaryConditions() throws E //"equal" such that value = endOffset String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, - TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 10"); + TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 10"); runKafkaSQLVerifyCount(queryString,expectedRowCount); queryBuilder() - .sql(queryString) - .jsonPlanMatcher() - .include(String.format(EXPECTED_PATTERN, expectedRowCount)) - .match(); + .sql(queryString) + .jsonPlanMatcher() + .include(String.format(EXPECTED_PATTERN, expectedRowCount)) + .match(); //"equal" such that value < startOffset queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, - TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = -1"); + TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = -1"); runKafkaSQLVerifyCount(queryString,expectedRowCount); queryBuilder() - .sql(queryString) - .jsonPlanMatcher() - .include(String.format(EXPECTED_PATTERN, expectedRowCount)) - .match(); + .sql(queryString) + .jsonPlanMatcher() + .include(String.format(EXPECTED_PATTERN, expectedRowCount)) + .match(); //"greater_than" such that value = endOffset-1 queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, - TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 9"); + TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 9"); runKafkaSQLVerifyCount(queryString,expectedRowCount); queryBuilder() - .sql(queryString) - .jsonPlanMatcher() - .include(String.format(EXPECTED_PATTERN, expectedRowCount)) - .match(); + .sql(queryString) + .jsonPlanMatcher() + .include(String.format(EXPECTED_PATTERN, expectedRowCount)) + .match(); //"greater_than_or_equal" such that value = endOffset queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, - TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 10"); + TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 10"); runKafkaSQLVerifyCount(queryString,expectedRowCount); queryBuilder() - .sql(queryString) - .jsonPlanMatcher() - .include(String.format(EXPECTED_PATTERN, expectedRowCount)) - .match(); + .sql(queryString) + .jsonPlanMatcher() + .include(String.format(EXPECTED_PATTERN, expectedRowCount)) + .match(); //"less_than" such that value = startOffset queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, - TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset < 0"); + TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset < 0"); runKafkaSQLVerifyCount(queryString,expectedRowCount); queryBuilder() - .sql(queryString) - .jsonPlanMatcher() - .include(String.format(EXPECTED_PATTERN, expectedRowCount)) - .match(); + .sql(queryString) + .jsonPlanMatcher() + .include(String.format(EXPECTED_PATTERN, expectedRowCount)) + .match(); //"less_than_or_equal" such that value < startOffset queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, - TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset <= -1"); + TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset <= -1"); runKafkaSQLVerifyCount(queryString,expectedRowCount); queryBuilder() - .sql(queryString) - .jsonPlanMatcher() - .include(String.format(EXPECTED_PATTERN, expectedRowCount)) - .match(); + .sql(queryString) + .jsonPlanMatcher() + .include(String.format(EXPECTED_PATTERN, expectedRowCount)) + .match(); } /** @@ -267,36 +266,36 @@ public void testPushdownOffsetOneRecordReturnedWithBoundaryConditions() throws E //"equal" such that value = endOffset-1 String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, - TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 9"); + TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 9"); runKafkaSQLVerifyCount(queryString, expectedRowCount); queryBuilder() - .sql(queryString) - .jsonPlanMatcher() - .include(String.format(EXPECTED_PATTERN, expectedRowCount)) - .match(); + .sql(queryString) + .jsonPlanMatcher() + .include(String.format(EXPECTED_PATTERN, expectedRowCount)) + .match(); //"greater_than" such that value = endOffset-2 queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, - TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 8"); + TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 8"); runKafkaSQLVerifyCount(queryString,expectedRowCount); queryBuilder() - .sql(queryString) - .jsonPlanMatcher() - .include(String.format(EXPECTED_PATTERN, expectedRowCount)) - .match(); + .sql(queryString) + .jsonPlanMatcher() + .include(String.format(EXPECTED_PATTERN, expectedRowCount)) + .match(); //"greater_than_or_equal" such that value = endOffset-1 queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, - TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 9"); + TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 9"); runKafkaSQLVerifyCount(queryString,expectedRowCount); queryBuilder() - .sql(queryString) - .jsonPlanMatcher() - .include(String.format(EXPECTED_PATTERN, expectedRowCount)) - .match(); + .sql(queryString) + .jsonPlanMatcher() + .include(String.format(EXPECTED_PATTERN, expectedRowCount)) + .match(); } /** @@ -310,14 +309,14 @@ public void testPushdownWithOr() throws Exception { final int expectedRowCount = 26; final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_OR, - TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2); + TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2); runKafkaSQLVerifyCount(queryString,expectedRowCount); queryBuilder() - .sql(queryString) - .jsonPlanMatcher() - .include(String.format(EXPECTED_PATTERN, expectedRowCount)) - .match(); + .sql(queryString) + .jsonPlanMatcher() + .include(String.format(EXPECTED_PATTERN, expectedRowCount)) + .match(); } /** @@ -331,14 +330,14 @@ public void testPushdownWithOr1() throws Exception { final int expectedRowCount = 10; final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_OR, - TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2); + TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2); runKafkaSQLVerifyCount(queryString,expectedRowCount); queryBuilder() - .sql(queryString) - .jsonPlanMatcher() - .include(String.format(EXPECTED_PATTERN, expectedRowInPlan)) - .match(); + .sql(queryString) + .jsonPlanMatcher() + .include(String.format(EXPECTED_PATTERN, expectedRowInPlan)) + .match(); } /** @@ -352,14 +351,14 @@ public void testPushdownWithAndOrCombo() throws Exception { final int expectedRowCount = 8; final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_AND_OR_PATTERN_1, - TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3); + TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3); runKafkaSQLVerifyCount(queryString,expectedRowCount); queryBuilder() - .sql(queryString) - .jsonPlanMatcher() - .include(String.format(EXPECTED_PATTERN, expectedRowCount)) - .match(); + .sql(queryString) + .jsonPlanMatcher() + .include(String.format(EXPECTED_PATTERN, expectedRowCount)) + .match(); } /** @@ -375,14 +374,14 @@ public void testPushdownWithAndOrCombo2() throws Exception { final int expectedRowCount = 4; final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_AND_OR_PATTERN_3, - TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3, predicate4); + TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3, predicate4); runKafkaSQLVerifyCount(queryString,expectedRowCount); queryBuilder() - .sql(queryString) - .jsonPlanMatcher() - .include(String.format(EXPECTED_PATTERN, expectedRowCountInPlan)) - .match(); + .sql(queryString) + .jsonPlanMatcher() + .include(String.format(EXPECTED_PATTERN, expectedRowCountInPlan)) + .match(); } /** @@ -397,14 +396,14 @@ public void testPushdownTimestampWithNonMetaField() throws Exception { final int expectedRowCount = 10; final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_AND, - TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2); + TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2); runKafkaSQLVerifyCount(queryString,expectedRowCount); queryBuilder() - .sql(queryString) - .jsonPlanMatcher() - .include(String.format(EXPECTED_PATTERN, expectedRowCountInPlan)) - .match(); + .sql(queryString) + .jsonPlanMatcher() + .include(String.format(EXPECTED_PATTERN, expectedRowCountInPlan)) + .match(); } /** @@ -420,13 +419,13 @@ public void testNoPushdownOfOffsetWithNonMetadataField() throws Exception { final int expectedRowCount = 30; final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_AND_OR_PATTERN_2, - TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3); + TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3); runKafkaSQLVerifyCount(queryString,expectedRowCount); queryBuilder() - .sql(queryString) - .jsonPlanMatcher() - .include(String.format(EXPECTED_PATTERN, expectedRowCountInPlan)) - .match(); + .sql(queryString) + .jsonPlanMatcher() + .include(String.format(EXPECTED_PATTERN, expectedRowCountInPlan)) + .match(); } -} +} \ No newline at end of file diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java index 2d102720ff6..dd0740deb1c 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java @@ -21,7 +21,6 @@ import org.apache.drill.categories.SlowTest; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.physical.rowSet.RowSet; import org.apache.drill.exec.rpc.RpcException; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -29,12 +28,9 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.Assert; -import org.junit.FixMethodOrder; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.runners.MethodSorters; -import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -43,7 +39,6 @@ import static org.apache.drill.exec.store.kafka.TestKafkaSuit.embeddedKafkaCluster; import static org.junit.Assert.fail; -@FixMethodOrder(MethodSorters.JVM) @Category({KafkaStorageTest.class, SlowTest.class}) public class KafkaQueriesTest extends KafkaTestBase { @@ -74,8 +69,6 @@ public void testPartitionMinOffset() throws Exception { Map startOffsetsMap = fetchOffsets(-2); String queryString = String.format(TestQueryConstants.MIN_OFFSET_QUERY, TestQueryConstants.JSON_TOPIC); - RowSet results = client.queryBuilder().sql(queryString).rowSet(); - testBuilder() .sqlQuery(queryString) .unOrdered() @@ -107,7 +100,7 @@ public void testInformationSchema() throws Exception { private Map fetchOffsets(int flag) { Consumer kafkaConsumer = null; try { - kafkaConsumer = new KafkaConsumer<>(storagePluginConfig.getKafkaConsumerProps(), + kafkaConsumer = new KafkaConsumer<>(storagePluginConfig.getKafkaConsumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer()); Map offsetsMap = new HashMap<>(); @@ -116,7 +109,7 @@ private Map fetchOffsets(int flag) { // evaluates lazily, seeking to the // first/last offset in all partitions only when poll(long) or // position(TopicPartition) are called - kafkaConsumer.poll(Duration.ofSeconds(0)); + kafkaConsumer.poll(0); Set assignments = kafkaConsumer.assignment(); if (flag == -2) { @@ -252,4 +245,4 @@ public void testEscapeAnyChar() throws Exception { client.resetSession(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR); } } -} +} \ No newline at end of file diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java index 56e8138ab9a..c233b1f4d40 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java @@ -76,4 +76,4 @@ public static void tearDownKafkaTestBase() { TestKafkaSuit.tearDownCluster(); } } -} +} \ No newline at end of file