-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SQL: Add ability to perform CCS through SQL querying #78903
Conversation
This adds the ability to perform cross cluster searching through SQL. This is done by simply using the CCS-specific `cluster:index` notation, where the "cluster" maps to an SQL catalog and "index" to a table; patterns are supported for both. Noteworthy here is that a search runs either on the local or remote clusters, but not both (which is a CCS-specific behavior). Another limitation is currently that table enumeration in remote clusters will not contain aliases (not reported through field caps). Also, frozen indices are currently not listed as such (they're listed as normal indices). `SYS COLUMNS` command will not take a catalog pattern - an exact match is required -, following xDBC spec. `SYS TABLES` can, however.
License and WS fixes. Add a comment. Revert a debugging change.
@elasticmachine update branch |
Add getClusters() metadata test. Adjust expected output of other 'SHOW TABLES' tests.
Pinging @elastic/es-ql (Team:QL) |
...src/test/java/org/elasticsearch/xpack/sql/qa/multi_cluster_with_security/JdbcMetadataIT.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gave a first round of feedback. Looks good overall.
It's not clear to me what test are running against a catalog and which one are not.
SELECT FROM table
vs
SELECT FROM local:table
vs
SELECT FROM remote:table
this.client = client; | ||
this.clusterName = clusterName; | ||
this.typeRegistry = typeRegistry; | ||
this.remoteClusters = new RemoteClusterResolver(settings, clusterSettings); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing wise I think it's better to pass a provider that returns a Set vs the ClusterSettings which is an implementation detail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've pulled this class out as standalone and passed IndexResolver just a Supplier of the set.
client.admin().indices().getAliases(aliasRequest, wrap(aliases -> | ||
resolveIndices(indices, javaRegex, aliases, retrieveIndices, retrieveFrozenIndices, listener), | ||
ex -> { | ||
// with security, two exception can be thrown: | ||
// INFE - if no alias matches | ||
// security exception is the user cannot access aliases | ||
|
||
// in both cases, that is allowed and we continue with the indices request | ||
if (ex instanceof IndexNotFoundException || ex instanceof ElasticsearchSecurityException) { | ||
resolveIndices(indices, javaRegex, null, retrieveIndices, retrieveFrozenIndices, listener); | ||
} else { | ||
listener.onFailure(ex); | ||
client.admin().indices().getAliases(aliasRequest, wrap(aliases -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you comment on what's the difference between the old code and the new one - the whole block was formatted so it's difficult to keep track.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The significant changes are: (1) decouple and defer the output filtering from index/alias collection and (2) selectively cascade the resolution: aliases -> local indices -> remote indices (-> filtering), as these are needed.
The aliases and local indices resolution haven't otherwise changed.
client.admin().indices().getIndex(indexRequest, wrap(indices -> { | ||
if (indices != null) { | ||
for (String indexName : indices.getIndices()) { | ||
boolean isFrozen = retrieveFrozenIndices | ||
&& indices.getSettings().get(indexName).getAsBoolean("index.frozen", false); | ||
indexInfos.add(new IndexInfo(clusterName, indexName, | ||
isFrozen ? IndexType.FROZEN_INDEX : IndexType.STANDARD_INDEX)); | ||
} | ||
} | ||
resolveRemoteIndices(clusterWildcard, indexWildcard, javaRegex, retrieveFrozenIndices, indexInfos, listener); | ||
}, | ||
listener::onFailure) | ||
); | ||
} else { | ||
resolveRemoteIndices(clusterWildcard, indexWildcard, javaRegex, retrieveFrozenIndices, indexInfos, listener); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This piece of code is similar to the method below (resolveRemoteIndices) - can the two be merged somehow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also thought about it, but didn't come to a solution. The flow is similar, but different request types and API and a "generified" would be more verbose eventually. But happy to apply suggestions.
sb.append(query.substring(i, j).replaceAll("(?i)(FROM)(\\s+)(\\w+|\"[^\"]+\")", | ||
"$1$2" + buildRemoteIndexName(REMOTE_CLUSTER_NAME, "$3"))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😓
Extract RemoteClusterResolver into own class and ony pass a supplier of remote clusters to the IndexResolver.
This adds a new "catalog" request parameter, to be used by the xDBC clients. It's value is whatever the set-catalog xDBC action is passed to by the application and will have ES/SQL run the query on the indicated cluster, in case the query itself doesn't spacify a cluster target (which takes precedence over the parameter).
@elasticmachine update branch |
Spotless reformat a file.
The JDBC driver now takes a new parameter, The |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall. Left some minor comments.
|
||
// first get aliases (if specified) | ||
boolean retrieveAliases = CollectionUtils.isEmpty(types) || types.contains(IndexType.ALIAS); | ||
boolean retrieveIndices = CollectionUtils.isEmpty(types) || types.contains(IndexType.STANDARD_INDEX); | ||
boolean retrieveFrozenIndices = CollectionUtils.isEmpty(types) || types.contains(IndexType.FROZEN_INDEX); | ||
|
||
String[] indices = Strings.commaDelimitedListToStringArray(indexWildcard); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've restored it. In a previous design, the non-split version was called in 3 places, but now it makes sense to have it split at the beginning. (Renamed it tho, to be able to use the same name across invocations with no conflict of not-changed code.)
; | ||
|
||
showTablesWithFrozen | ||
SHOW TABLES CATALOG 'my_remote_cluster' INCLUDE FROZEN; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice
@@ -179,7 +179,7 @@ private static FieldAttribute getFieldAttribute(String name) { | |||
} | |||
|
|||
public void testPruneSubQueryAliases() { | |||
ShowTables s = new ShowTables(EMPTY, null, null, false); | |||
ShowTables s = new ShowTables(EMPTY, null, null, null,null, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whitespace
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed.
@@ -65,9 +65,13 @@ | |||
// NB: this is password instead of pass since that's what JDBC DriverManager/tools use | |||
public static final String AUTH_PASS = "password"; | |||
|
|||
// Default catalog | |||
|
|||
public static final String CATALOG = "catalog"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
// Default catalog | ||
|
||
public static final String CATALOG = "catalog"; | ||
|
||
protected static final Set<String> OPTION_NAMES = new LinkedHashSet<>( | ||
Arrays.asList(PROPERTIES_VALIDATION, BINARY_COMMUNICATION, CONNECT_TIMEOUT, NETWORK_TIMEOUT, QUERY_TIMEOUT, PAGE_TIMEOUT, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the catalog defined on the connection configuration? This is about doing a network connection to the cluster not about its content.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The value of the catalog
parameter will contain what the drivers receive from clients through the setCatalog()
/SQL_ATTR_CURRENT_CATALOG
(/USE/SET database
potentially for CLI, in the future), i.e. the context in which to execute a query if it contains no catalog itself.
@@ -256,6 +264,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws | |||
if (zoneId != null) { | |||
builder.field(TIME_ZONE_NAME, zoneId.getId()); | |||
} | |||
if (catalog != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why add this code here? Not at the beginning nor at the end of the code block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to consistently keep the catalog
after zoneId
(member declaration, parameter order, code inserts), since catalog
seems as "connection relevant" as zoneId
(i.e. not really a meta config, but rather execution-relevant).
Change member access level, spacing.
Revert to splitting index string higher up in the execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a second round of comments/questions.
apply plugin: 'elasticsearch.rest-test' | ||
|
||
dependencies { | ||
testImplementation project(path: xpackModule('eql:qa:common')) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is eql
here needed? Most likely it shouldn't be needed as dependency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well spotted. Replaced the dependency with a minimal one (ql:test-fixtures
).
setting 'node.roles', '[data,ingest,master]' | ||
setting 'xpack.ml.enabled', 'false' | ||
setting 'xpack.watcher.enabled', 'false' | ||
setting 'xpack.security.enabled', 'false' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
security
is disabled but the test project is called multi-cluster-with-security
and you set an username and password later in the file. Doesn't look right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enabled back, thanks.
remoteClusterReg.get().getAllTransportPortURI().collect { "\"$it\"" }.toString() | ||
} | ||
setting 'cluster.remote.connections_per_cluster', "1" | ||
setting 'xpack.security.enabled', 'false' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
assertFalse(rs.next()); | ||
|
||
es.setCatalog(LOCAL_CLUSTER_NAME); | ||
expectThrows(SQLException.class, ps::executeQuery); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth checking for the error message as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checked.
super(fileName, groupName, testName, lineNumber, randomBoolean() ? qualifyFromClause(testCase) : testCase); | ||
} | ||
|
||
// qualify the query FROM clause with the cluster name, but (crudely) skip `EXTRACT(a FROM b)` calls. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why skipping the EXTRACT
function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid changing queries to an incorrect format like SELECT ... EXTRACT(YEAR cluster:FROM date) FROM cluster:index ...
.
for (String table : tables) { | ||
String line = null; | ||
/* | ||
* Security automatically creates either a `.security` or a | ||
* `.security6` index but it might not have created the index | ||
* by the time the test runs. | ||
*/ | ||
while (line == null || line.startsWith(".security")) { | ||
while (line == null || line.contains("|.security")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change if the comment says ".security" or ".security6"? Either leave it as is, or do also change the comment, because there is a slight contradiction now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SHOW TABLES
will now have as first column the catalog and the table will be second. So for both .security
and .security6
indices, the listing will no longer startWith()
that value, but the line will contain a |.security
(and/or |.security6
).
private void cleanup() throws IOException { | ||
try { | ||
deleteTestIndex(); | ||
} catch (ResponseException e) { | ||
// while the majority of tests use the index(...) test method, few create their own index | ||
if (e.getResponse().getStatusLine().getStatusCode() != 404) { | ||
throw e; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed? I thought that ESRestTestCase
already does this if preserveIndicesUponCompletion()
is not overridden, which should not be the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
preserveIndicesUponCompletion()
is used to clear (or not) the local cluster. The indices in these tests are created on the remote cluster and only queried locally.
@@ -255,6 +273,8 @@ public void testNextPageWithDatetimeAndTimezoneParam() throws IOException { | |||
expected, | |||
runSql(new StringEntity(cursor(cursor).mode(mode).toString(), ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode) | |||
); | |||
|
|||
deleteIndex("test_date_timezone"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question about this operation. Why is it specifically needed if one of the base classes should do the cleanup in an @After
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
() -> runSql(randomMode(), "SELECT * FROM test JOIN other"), | ||
containsString("line 1:21: Queries with JOIN are not yet supported") | ||
() -> runSql(randomMode(), "SELECT * FROM " + indexPattern("test") + " JOIN other"), | ||
containsString(": Queries with JOIN are not yet supported") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The line
is removed because of the index pattern that now can have the cluster name in front of it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. These tests are executed both on local cluster and with CCS so the offsets would differ.
Address review comments
Enable test cluster security
Propagate credentials to JdbcIntegrationTestCase hierarchy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
💔 Backport failed
You can use sqren/backport to manually backport by running |
This adds the ability to perform cross cluster searching through SQL. This is done by simply using the CCS-specific `cluster:index` notation, where the "cluster" maps to an SQL catalog and "index" to a table; patterns are supported for both. Noteworthy here is that a search runs either on the local or remote clusters, but not both (which is a CCS-specific behaviour). Another limitation is currently that table enumeration in remote clusters will not contain aliases (not reported through field caps). Also, frozen indices are currently not listed as such (they're listed as normal indices). `SYS COLUMNS` command will not take a catalog pattern - an exact match is required -, following xDBC spec. `SYS TABLES` can, however. This adds a new "catalog" request parameter, to be used by the xDBC clients. It's value is whatever the set-catalog xDBC action is passed to by the application and will have ES/SQL run the query on the indicated cluster, in case the query itself doesn't specify a cluster target (which takes precedence over the parameter). (cherry picked from commit 733837d)
) (#79536) * SQL: Add ability to perform CCS through SQL querying (#78903) This adds the ability to perform cross cluster searching through SQL. This is done by simply using the CCS-specific `cluster:index` notation, where the "cluster" maps to an SQL catalog and "index" to a table; patterns are supported for both. Noteworthy here is that a search runs either on the local or remote clusters, but not both (which is a CCS-specific behaviour). Another limitation is currently that table enumeration in remote clusters will not contain aliases (not reported through field caps). Also, frozen indices are currently not listed as such (they're listed as normal indices). `SYS COLUMNS` command will not take a catalog pattern - an exact match is required -, following xDBC spec. `SYS TABLES` can, however. This adds a new "catalog" request parameter, to be used by the xDBC clients. It's value is whatever the set-catalog xDBC action is passed to by the application and will have ES/SQL run the query on the indicated cluster, in case the query itself doesn't specify a cluster target (which takes precedence over the parameter). (cherry picked from commit 733837d) * Remove test cluster setting not avail in 7.x Remove 'xpack.security.autoconfiguration.enabled' setting.
The QL team added support for SQL via CCS to both 7.16 and 8.0 - #78903
This adds the ability to perform cross cluster searching through SQL.
This is done by simply using the CCS-specific
cluster:index
notation,where the "cluster" maps to an SQL catalog and "index" to a table;
patterns are supported for both. Noteworthy here is that a search runs
either on the local or remote clusters, but not both (which is a
CCS-specific behavior).
Another limitation is currently that table enumeration in remote
clusters will not contain aliases (not reported through field caps).
Also, frozen indices are currently not listed as such (they're listed as
normal indices).
SYS COLUMNS
command will not take a catalog pattern - an exact match isrequired -, following xDBC spec.
SYS TABLES
can, however.The query API now takes a new parameter,
catalog
, whose value is prefixedto the index in the query, before execution, if the index is not already qualified
(which takes precedence).