Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove index_realtime and index_realtime_appenderator tasks #16602

Merged
merged 27 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
60ceeed
Remove index_realtime and index_realtime_appenderator tasks.
gianm Jan 18, 2024
2548eeb
Updates.
gianm Jan 18, 2024
abfef32
Style.
gianm Jan 18, 2024
3396f17
Fixes for static analysis.
gianm Jan 18, 2024
c773fcd
Fix test.
gianm Jan 18, 2024
7fd4f4f
Updates for static analysis, ITs.
gianm Jan 18, 2024
a319f16
Fix test.
gianm Jan 18, 2024
676dd4c
Fixes.
gianm Jan 19, 2024
7f39083
Remove function.
gianm Jan 19, 2024
0e77cd2
Merge branch 'master' into realtime-rampage
gianm Jan 19, 2024
1fbb38e
Merge branch 'master' into realtime-rampage
gianm Jan 20, 2024
42680fe
IT WIP
gianm Jan 20, 2024
cfb4faf
delete
gianm Jan 20, 2024
c85f6ff
Merge branch 'master' into realtime-rampage
gianm Mar 20, 2024
d7ec2cf
Merge branch 'master' into realtime-rampage
gianm Mar 20, 2024
9a90795
Fix style.
gianm Mar 20, 2024
2eff3ea
Merge remote-tracking branch 'upstream/master' into realtime-rampage
clintropolis Jun 14, 2024
7e6d38b
fix inspection, add union query integration test using new framework
clintropolis Jun 14, 2024
8499c4a
style
clintropolis Jun 14, 2024
e165efd
style again
clintropolis Jun 14, 2024
7d663fd
dependencies
clintropolis Jun 14, 2024
b3dceea
adjust
clintropolis Jun 14, 2024
f2401cd
revert some unintended changes
clintropolis Jun 14, 2024
aaf2812
Merge remote-tracking branch 'upstream/master' into realtime-rampage
clintropolis Jun 14, 2024
9f3da17
fix test
clintropolis Jun 14, 2024
e06647a
maybe fix tests
clintropolis Jun 14, 2024
f3ce50b
missed a spot
clintropolis Jun 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
IT WIP
  • Loading branch information
gianm committed Jan 20, 2024
commit 42680fe6fd254e27ba0a2b5b60db2ac829a3eba5
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ services:
# The image will intialize the user and DB upon first start.
metadata:
# Uncomment the following when running on M1 Macs:
# platform: linux/x86_64
platform: linux/x86_64
image: mysql:$MYSQL_IMAGE_VERSION
container_name: metadata
labels:
Expand Down
105 changes: 105 additions & 0 deletions integration-tests-ex/cases/cluster/Query/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

networks:
druid-it-net:
name: druid-it-net
ipam:
config:
- subnet: 172.172.172.0/24

services:
zookeeper:
extends:
file: ../Common/dependencies.yaml
service: zookeeper

metadata:
extends:
file: ../Common/dependencies.yaml
service: metadata

coordinator:
extends:
file: ../Common/druid.yaml
service: coordinator
container_name: coordinator
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
# The frequency with which the coordinator polls the database
# for changes. The DB population code has to wait at least this
# long for the coordinator to notice changes.
- druid_manager_segments_pollDuration=PT5S
- druid_coordinator_period=PT10S
depends_on:
- zookeeper
- metadata

overlord:
extends:
file: ../Common/druid.yaml
service: overlord
container_name: overlord
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- zookeeper
- metadata

broker:
extends:
file: ../Common/druid.yaml
service: broker
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- zookeeper

router:
extends:
file: ../Common/druid.yaml
service: router
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- zookeeper

historical:
extends:
file: ../Common/druid.yaml
service: historical
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- zookeeper

middlemanager:
extends:
file: ../Common/druid.yaml
service: middlemanager
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
volumes:
# Test data
- ../../resources:/resources
depends_on:
- zookeeper

kafka:
extends:
file: ../Common/dependencies.yaml
service: kafka
depends_on:
- zookeeper
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.testsEx.categories;

public class Query
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.testsEx.query;

import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
import org.apache.druid.curator.discovery.ServerDiscoverySelector;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.testing.clients.EventReceiverFirehoseTestClient;
import org.apache.druid.testing.guice.TestClient;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.ServerDiscoveryUtil;
import org.apache.druid.testsEx.categories.Query;
import org.apache.druid.testsEx.config.DruidTestRunner;
import org.apache.druid.testsEx.indexer.AbstractITBatchIndexTest;
import org.apache.druid.testsEx.indexer.AbstractIndexerTest;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

@RunWith(DruidTestRunner.class)
@Category(Query.class)
public class ITUnionQueryTest extends AbstractIndexerTest
{
private static final Logger LOG = new Logger(ITUnionQueryTest.class);
private static final String UNION_TASK_RESOURCE = "/indexer/wikipedia_union_index_task.json";
private static final String EVENT_RECEIVER_SERVICE_PREFIX = "eventReceiverServiceName";
private static final String UNION_DATA_FILE = "/data/union_query/wikipedia_index_data.json";
private static final String UNION_QUERIES_RESOURCE = "/queries/union_queries.json";
private static final String UNION_DATASOURCE = "wikipedia_index_test";

@Inject
ServerDiscoveryFactory factory;

@Inject
@TestClient
HttpClient httpClient;

private String fullDatasourceName;

@Before
public void setFullDatasourceName()
{
fullDatasourceName = UNION_DATASOURCE + config.getExtraDatasourceNameSuffix();
}

@Test
public void testUnionQuery() throws IOException
{
final int numTasks = 3;
final Closer closer = Closer.create();
for (int i = 0; i < numTasks; i++) {
closer.register(unloader(fullDatasourceName + i));
}
try {
// Load 3 datasources with same dimensions
String task = setShutOffTime(
getResourceAsString(UNION_TASK_RESOURCE),
DateTimes.utc(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3))
);
List<String> taskIDs = new ArrayList<>();
for (int i = 0; i < numTasks; i++) {
taskIDs.add(
indexer.submitTask(
withServiceName(
withDataSource(task, fullDatasourceName + i),
EVENT_RECEIVER_SERVICE_PREFIX + i
)
)
);
}
for (int i = 0; i < numTasks; i++) {
postEvents(i);
}

// wait until all events are ingested
ITRetryUtil.retryUntil(
() -> {
for (int i = 0; i < numTasks; i++) {
final int countRows = queryHelper.countRows(
fullDatasourceName + i,
Intervals.of("2013-08-31/2013-09-01"),
name -> new LongSumAggregatorFactory(name, "count")
);

// there are 10 rows, but query only covers the first 5
if (countRows < 5) {
LOG.warn("%d events have been ingested to %s so far", countRows, fullDatasourceName + i);
return false;
}
}
return true;
},
true,
1000,
100,
"Waiting all events are ingested"
);

// should hit the queries on realtime task
LOG.info("Running Union Queries..");

String queryResponseTemplate;
try {
InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(UNION_QUERIES_RESOURCE);
queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8);
}
catch (IOException e) {
throw new ISE(e, "could not read query file: %s", UNION_QUERIES_RESOURCE);
}

queryResponseTemplate = StringUtils.replace(
queryResponseTemplate,
"%%DATASOURCE%%",
fullDatasourceName
);

this.queryHelper.testQueriesFromString(queryResponseTemplate);

// wait for the task to complete
for (int i = 0; i < numTasks; i++) {
indexer.waitUntilTaskCompletes(taskIDs.get(i));
}
// task should complete only after the segments are loaded by historical node
for (int i = 0; i < numTasks; i++) {
final int taskNum = i;
ITRetryUtil.retryUntil(
() -> coordinator.areSegmentsLoaded(fullDatasourceName + taskNum),
true,
10000,
10,
"Real-time generated segments loaded"
);
}
// run queries on historical nodes
this.queryHelper.testQueriesFromString(queryResponseTemplate);

}
catch (Throwable e) {
throw closer.rethrow(e);
}
finally {
closer.close();
}
}

private String setShutOffTime(String taskAsString, DateTime time)
{
return StringUtils.replace(taskAsString, "#SHUTOFFTIME", time.toString());
}

private String withDataSource(String taskAsString, String dataSource)
{
return StringUtils.replace(taskAsString, "%%DATASOURCE%%", dataSource);
}

private String withServiceName(String taskAsString, String serviceName)
{
return StringUtils.replace(taskAsString, EVENT_RECEIVER_SERVICE_PREFIX, serviceName);
}

private void postEvents(int id) throws Exception
{
final ServerDiscoverySelector eventReceiverSelector = factory.createSelector(EVENT_RECEIVER_SERVICE_PREFIX + id);
eventReceiverSelector.start();
try {
ServerDiscoveryUtil.waitUntilInstanceReady(eventReceiverSelector, "Event Receiver");
// Access the docker VM mapped host and port instead of service announced in zookeeper
String host = config.getMiddleManagerHost() + ":" + eventReceiverSelector.pick().getPort();

LOG.info("Event Receiver Found at host [%s]", host);

LOG.info("Checking worker /status/health for [%s]", host);
ITRetryUtil.retryUntilTrue(
() -> {
try {
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.GET, new URL(StringUtils.format("https://%s/status/health", host))),
StatusResponseHandler.getInstance()
).get();
return response.getStatus().equals(HttpResponseStatus.OK);
}
catch (Throwable e) {
LOG.error(e, "");
return false;
}
},
StringUtils.format("Checking /status/health for worker [%s]", host)
);
LOG.info("Finished checking worker /status/health for [%s], success", host);

EventReceiverFirehoseTestClient client = new EventReceiverFirehoseTestClient(
host,
EVENT_RECEIVER_SERVICE_PREFIX + id,
jsonMapper,
httpClient,
smileMapper
);
client.postEventsFromFile(UNION_DATA_FILE);
}
finally {
eventReceiverSelector.stop();
}
}
}
Loading