Skip to content

Commit

Permalink
Fix empty datasource schema on the Broker when metadata query is disa…
Browse files Browse the repository at this point in the history
…bled (#16645)

* Fix build

* Fix empty datasource schema on the broker

* review comment

* Remove unused import
  • Loading branch information
findingrish authored Jun 28, 2024
1 parent 45c0200 commit b9c7664
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ services:
service: druid-broker
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
- druid_sql_planner_metadataRefreshPeriod=PT20S
- druid_sql_planner_metadataRefreshPeriod=PT30S
- druid_sql_planner_disableSegmentMetadataQueries=true
depends_on:
- druid-coordinator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.testng.Assert;

public final class DataLoaderHelper
{
Expand Down Expand Up @@ -50,6 +51,9 @@ public void waitUntilDatasourceIsReady(String datasource)
() -> sqlTestQueryHelper.isDatasourceLoadedInSQL(datasource),
StringUtils.format("Waiting for [%s] to be ready for SQL queries", datasource)
);

Assert.assertTrue(sqlTestQueryHelper.verifyTimeColumnIsPresent(datasource));

LOG.info("Datasource [%s] ready for SQL queries", datasource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,27 @@ public boolean isDatasourceLoadedInSQL(String datasource)
return false;
}
}

public boolean verifyTimeColumnIsPresent(String datasource)
{
final SqlQuery query = new SqlQuery(
"SELECT __time FROM \"" + datasource + "\" LIMIT 1",
null,
false,
false,
false,
null,
null
);

try {
//noinspection unchecked
queryClient.query(getQueryURL(broker), query);
return true;
}
catch (Exception e) {
LOG.debug(e, "Check query failed");
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,16 @@ public void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> da
continue;
}

if (rowSignature.getColumnNames().isEmpty()) {
// this case could arise when metadata refresh is disabled on broker
// and a new datasource is added
log.info("datasource [%s] schema has not been initialized yet, "
+ "check coordinator logs if this message is persistent.", dataSource);
// this is a harmless call
tables.remove(dataSource);
continue;
}

final PhysicalDatasourceMetadata physicalDatasourceMetadata = dataSourceMetadataFactory.build(dataSource, rowSignature);
updateDSMetadata(dataSource, physicalDatasourceMetadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1027,4 +1027,28 @@ public void testInvokeSegmentSchemaAnnounced() throws InterruptedException
buildSchemaMarkAndTableLatch();
serverView.invokeSegmentSchemasAnnouncedDummy();
}

@Test
public void testNoDatasourceSchemaWhenNoSegmentMetadata() throws InterruptedException, IOException
{
BrokerSegmentMetadataCacheConfig config = new BrokerSegmentMetadataCacheConfig();
config.setDisableSegmentMetadataQueries(true);

BrokerSegmentMetadataCache schema = buildSchemaMarkAndTableLatch(
config,
new NoopCoordinatorClient()
);

schema.start();
schema.awaitInitialization();

List<DataSegment> segments = schema.getSegmentMetadataSnapshot().values()
.stream()
.map(AvailableSegmentMetadata::getSegment)
.collect(Collectors.toList());

schema.refresh(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()), Collections.singleton("foo"));

Assert.assertNull(schema.getDatasource("foo"));
}
}

0 comments on commit b9c7664

Please sign in to comment.