Skip to content

Commit

Permalink
ESQL: adds Enrich implicit match_fields to field_caps call (#101456)
Browse files Browse the repository at this point in the history
* Take into account the Enrich implicit match_field when resolving
field names of the source index
  • Loading branch information
astefan committed Oct 27, 2023
1 parent b3db181 commit 74da807
Show file tree
Hide file tree
Showing 13 changed files with 237 additions and 30 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/101456.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 101456
summary: "ESQL: adds Enrich implicit `match_fields` to `field_caps` call"
area: ES|QL
type: bug
issues:
- 101328
8 changes: 4 additions & 4 deletions docs/reference/esql/metadata-fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ like the other index fields:

[source.merge.styled,esql]
----
include::{esql-specs}/metadata-ignoreCsvTests.csv-spec[tag=multipleIndices]
include::{esql-specs}/metadata-IT_tests_only.csv-spec[tag=multipleIndices]
----
[%header.monospaced.styled,format=dsv,separator=|]
|===
include::{esql-specs}/metadata-ignoreCsvTests.csv-spec[tag=multipleIndices-result]
include::{esql-specs}/metadata-IT_tests_only.csv-spec[tag=multipleIndices-result]
|===

Also, similar to the index fields, once an aggregation is performed, a
Expand All @@ -47,9 +47,9 @@ used as grouping field:

[source.merge.styled,esql]
----
include::{esql-specs}/metadata-ignoreCsvTests.csv-spec[tag=metaIndexInAggs]
include::{esql-specs}/metadata-IT_tests_only.csv-spec[tag=metaIndexInAggs]
----
[%header.monospaced.styled,format=dsv,separator=|]
|===
include::{esql-specs}/metadata-ignoreCsvTests.csv-spec[tag=metaIndexInAggs-result]
include::{esql-specs}/metadata-IT_tests_only.csv-spec[tag=metaIndexInAggs-result]
|===
20 changes: 10 additions & 10 deletions docs/reference/esql/processing-commands/enrich.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,51 +49,51 @@ column for each enrich field defined in the policy. The match is performed using
the `match_field` defined in the <<esql-enrich-policy,enrich policy>> and
requires that the input table has a column with the same name (`language_code`
in this example). `ENRICH` will look for records in the
<<esql-enrich-index,enrich index>> based on the match field value.
<<esql-enrich-index,enrich index>> based on the match field value.

[source.merge.styled,esql]
----
include::{esql-specs}/docs-ignoreCsvTests.csv-spec[tag=enrich]
include::{esql-specs}/docs-IT_tests_only.csv-spec[tag=enrich]
----
[%header.monospaced.styled,format=dsv,separator=|]
|===
include::{esql-specs}/docs-ignoreCsvTests.csv-spec[tag=enrich-result]
include::{esql-specs}/docs-IT_tests_only.csv-spec[tag=enrich-result]
|===

To use a column with a different name than the `match_field` defined in the
policy as the match field, use `ON <column-name>`:

[source.merge.styled,esql]
----
include::{esql-specs}/docs-ignoreCsvTests.csv-spec[tag=enrich_on]
include::{esql-specs}/docs-IT_tests_only.csv-spec[tag=enrich_on]
----
[%header.monospaced.styled,format=dsv,separator=|]
|===
include::{esql-specs}/docs-ignoreCsvTests.csv-spec[tag=enrich_on-result]
include::{esql-specs}/docs-IT_tests_only.csv-spec[tag=enrich_on-result]
|===

By default, each of the enrich fields defined in the policy is added as a
column. To explicitly select the enrich fields that are added, use
column. To explicitly select the enrich fields that are added, use
`WITH <field1>, <field2>...`:

[source.merge.styled,esql]
----
include::{esql-specs}/docs-ignoreCsvTests.csv-spec[tag=enrich_with]
include::{esql-specs}/docs-IT_tests_only.csv-spec[tag=enrich_with]
----
[%header.monospaced.styled,format=dsv,separator=|]
|===
include::{esql-specs}/docs-ignoreCsvTests.csv-spec[tag=enrich_with-result]
include::{esql-specs}/docs-IT_tests_only.csv-spec[tag=enrich_with-result]
|===

You can rename the columns that are added using `WITH new_name=<field1>`:

[source.merge.styled,esql]
----
include::{esql-specs}/docs-ignoreCsvTests.csv-spec[tag=enrich_rename]
include::{esql-specs}/docs-IT_tests_only.csv-spec[tag=enrich_rename]
----
[%header.monospaced.styled,format=dsv,separator=|]
|===
include::{esql-specs}/docs-ignoreCsvTests.csv-spec[tag=enrich_rename-result]
include::{esql-specs}/docs-IT_tests_only.csv-spec[tag=enrich_rename-result]
|===

In case of name collisions, the newly created columns will override existing
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.qa.single_node;

import org.elasticsearch.xpack.esql.qa.rest.RestEnrichTestCase;

public class RestEnrichIT extends RestEnrichTestCase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.qa.rest;

import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.MapMatcher.matchesMap;
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.runEsql;
import static org.hamcrest.Matchers.containsString;

public class RestEnrichTestCase extends ESRestTestCase {

private static final String sourceIndexName = "countries";
private static final String testIndexName = "test";
private static final String policyName = "countries";

@Before
@After
public void assertRequestBreakerEmpty() throws Exception {
EsqlSpecTestCase.assertRequestBreakerEmpty();
}

@Before
public void loadTestData() throws IOException {
Request request = new Request("PUT", "/" + testIndexName);
request.setJsonEntity("""
{
"mappings": {
"properties": {
"geo.dest": {
"type": "keyword"
},
"number": {
"type": "long"
}
}
}
}""");
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());

request = new Request("POST", "/" + testIndexName + "/_bulk");
request.addParameter("refresh", "true");
request.setJsonEntity("""
{ "index": {"_id": 1} }
{ "geo.dest": "US", "number": 1000 }
{ "index": {"_id": 2} }
{ "geo.dest": "US", "number": 1000 }
{ "index": {"_id": 3} }
{ "geo.dest": "CN", "number": 5000 }
""");
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());

request = new Request("PUT", "/" + sourceIndexName);
request.setJsonEntity("""
{
"mappings": {
"properties": {
"geo.dest": {
"type": "keyword"
},
"country_name": {
"type": "keyword"
}
}
}
}""");
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());

request = new Request("POST", "/" + sourceIndexName + "/_bulk");
request.addParameter("refresh", "true");
request.setJsonEntity("""
{ "index" : {}}
{ "geo.dest": "US", "country_name": "United States of America" }
{ "index" : {}}
{ "geo.dest": "IN", "country_name": "India" }
{ "index" : {}}
{ "geo.dest": "CN", "country_name": "China" }
""");
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());

request = new Request("PUT", "/_enrich/policy/" + policyName);
request.setJsonEntity("""
{
"match": {
"indices": "countries",
"match_field": "geo.dest",
"enrich_fields": ["country_name"]
}
}
""");
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());

request = new Request("PUT", "/_enrich/policy/" + policyName + "/_execute");
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
}

@After
public void wipeTestData() throws IOException {
try {
var response = client().performRequest(new Request("DELETE", "/" + testIndexName));
assertEquals(200, response.getStatusLine().getStatusCode());
response = client().performRequest(new Request("DELETE", "/" + sourceIndexName));
assertEquals(200, response.getStatusLine().getStatusCode());
response = client().performRequest(new Request("DELETE", "/_enrich/policy/" + policyName));
assertEquals(200, response.getStatusLine().getStatusCode());
} catch (ResponseException re) {
assertEquals(404, re.getResponse().getStatusLine().getStatusCode());
}
}

public void testNonExistentEnrichPolicy() throws IOException {
ResponseException re = expectThrows(
ResponseException.class,
() -> runEsql(new RestEsqlTestCase.RequestObjectBuilder().query("from test | enrich countris").build())
);
assertThat(
EntityUtils.toString(re.getResponse().getEntity()),
containsString("unresolved enrich policy [countris], did you mean [countries]?")
);
}

public void testNonExistentEnrichPolicy_KeepField() throws IOException {
ResponseException re = expectThrows(
ResponseException.class,
() -> runEsql(new RestEsqlTestCase.RequestObjectBuilder().query("from test | enrich countris | keep number").build())
);
assertThat(
EntityUtils.toString(re.getResponse().getEntity()),
containsString("unresolved enrich policy [countris], did you mean [countries]?")
);
}

public void testMatchField_ImplicitFieldsList() throws IOException {
Map<String, Object> result = runEsql(
new RestEsqlTestCase.RequestObjectBuilder().query("from test | enrich countries | keep number").build()
);
var columns = List.of(Map.of("name", "number", "type", "long"));
var values = List.of(List.of(1000), List.of(1000), List.of(5000));

assertMap(result, matchesMap().entry("columns", columns).entry("values", values));
}

public void testMatchField_ImplicitFieldsList_WithStats() throws IOException {
Map<String, Object> result = runEsql(
new RestEsqlTestCase.RequestObjectBuilder().query("from test | enrich countries | stats s = sum(number) by country_name")
.build()
);
var columns = List.of(Map.of("name", "s", "type", "long"), Map.of("name", "country_name", "type", "keyword"));
var values = List.of(List.of(2000, "United States of America"), List.of(5000, "China"));

assertMap(result, matchesMap().entry("columns", columns).entry("values", values));
}

@Override
protected boolean preserveClusterUponCompletion() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -522,11 +522,7 @@ private LogicalPlan resolveEnrich(Enrich enrich, List<Attribute> childrenOutput)
}
if (resolved.resolved() && resolved.dataType() != KEYWORD) {
resolved = ua.withUnresolvedMessage(
"Unsupported type ["
+ resolved.dataType()
+ "] for enrich matching field ["
+ ua.name()
+ "]; only KEYWORD allowed"
"Unsupported type [" + resolved.dataType() + "] for enrich matching field [" + ua.name() + "]; only KEYWORD allowed"
);
}
return new Enrich(enrich.source(), enrich.child(), enrich.policyName(), resolved, enrich.policy(), enrich.enrichFields());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -151,22 +150,33 @@ private <T> void preAnalyze(LogicalPlan parsed, BiFunction<IndexResolution, Enri
PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed);
Set<String> policyNames = new HashSet<>(preAnalysis.policyNames);
EnrichResolution resolution = new EnrichResolution(ConcurrentCollections.newConcurrentSet(), enrichPolicyResolver.allPolicyNames());
AtomicReference<IndexResolution> resolvedIndex = new AtomicReference<>();

ActionListener<Void> groupedListener = listener.delegateFailureAndWrap((l, unused) -> {
assert resolution.resolvedPolicies().size() == policyNames.size()
: resolution.resolvedPolicies().size() + " != " + policyNames.size();
assert resolvedIndex.get() != null : "index wasn't resolved";
l.onResponse(action.apply(resolvedIndex.get(), resolution));

// first we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API
var matchFields = resolution.resolvedPolicies()
.stream()
.filter(p -> p.index().isValid()) // only if the policy by the specified name was found; later the Verifier will be
// triggered
.map(p -> p.policy().getMatchField())
.collect(Collectors.toSet());

preAnalyzeIndices(
parsed,
ActionListener.wrap(indexResolution -> l.onResponse(action.apply(indexResolution, resolution)), listener::onFailure),
matchFields
);
});
try (RefCountingListener refs = new RefCountingListener(groupedListener)) {
preAnalyzeIndices(parsed, refs.acquire(resolvedIndex::set));
for (String policyName : policyNames) {
enrichPolicyResolver.resolvePolicy(policyName, refs.acquire(resolution.resolvedPolicies()::add));
}
}
}

private <T> void preAnalyzeIndices(LogicalPlan parsed, ActionListener<IndexResolution> listener) {
private <T> void preAnalyzeIndices(LogicalPlan parsed, ActionListener<IndexResolution> listener, Set<String> enrichPolicyMatchFields) {
PreAnalyzer.PreAnalysis preAnalysis = new PreAnalyzer().preAnalyze(parsed);
// TODO we plan to support joins in the future when possible, but for now we'll just fail early if we see one
if (preAnalysis.indices.size() > 1) {
Expand All @@ -176,6 +186,11 @@ private <T> void preAnalyzeIndices(LogicalPlan parsed, ActionListener<IndexResol
TableInfo tableInfo = preAnalysis.indices.get(0);
TableIdentifier table = tableInfo.id();
var fieldNames = fieldNames(parsed);

if (enrichPolicyMatchFields.isEmpty() == false && fieldNames != IndexResolver.ALL_FIELDS) {
fieldNames.addAll(enrichPolicyMatchFields);
fieldNames.addAll(subfields(enrichPolicyMatchFields));
}
indexResolver.resolveAsMergedMapping(
table.index(),
fieldNames,
Expand Down Expand Up @@ -254,9 +269,7 @@ static Set<String> fieldNames(LogicalPlan parsed) {
if (fieldNames.isEmpty()) {
return IndexResolver.ALL_FIELDS;
} else {
fieldNames.addAll(
fieldNames.stream().filter(name -> name.endsWith(WILDCARD) == false).map(name -> name + ".*").collect(Collectors.toSet())
);
fieldNames.addAll(subfields(fieldNames));
return fieldNames;
}
}
Expand All @@ -269,6 +282,10 @@ private static boolean matchByName(Attribute attr, String other, boolean skipIfP
return isPattern ? Regex.simpleMatch(attr.qualifiedName(), other) : attr.qualifiedName().equals(other);
}

private static Set<String> subfields(Set<String> names) {
return names.stream().filter(name -> name.endsWith(WILDCARD) == false).map(name -> name + ".*").collect(Collectors.toSet());
}

public void optimizedPlan(LogicalPlan logicalPlan, ActionListener<LogicalPlan> listener) {
analyzedPlan(logicalPlan, map(listener, p -> {
var plan = logicalPlanOptimizer.optimize(p);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
public class CsvTests extends ESTestCase {

private static final Logger LOGGER = LogManager.getLogger(CsvTests.class);
private static final String IGNORED_CSV_FILE_NAMES_PATTERN = "-IT_tests_only";

private final String fileName;
private final String groupName;
Expand All @@ -160,7 +161,9 @@ public class CsvTests extends ESTestCase {

@ParametersFactory(argumentFormatting = "%2$s.%3$s")
public static List<Object[]> readScriptSpec() throws Exception {
List<URL> urls = classpathResources("/*.csv-spec").stream().filter(x -> x.toString().contains("-ignoreCsvTests") == false).toList();
List<URL> urls = classpathResources("/*.csv-spec").stream()
.filter(x -> x.toString().contains(IGNORED_CSV_FILE_NAMES_PATTERN) == false)
.toList();
assertTrue("Not enough specs found " + urls, urls.size() > 0);
return SpecReader.readScriptSpec(urls, specParser());
}
Expand Down
Loading

0 comments on commit 74da807

Please sign in to comment.