Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Cluster & XPack clients from HLRC (#83423) #83593

Merged
merged 12 commits into from
Feb 10, 2022
Next Next commit
Remove Cluster & XPack clients from HLRC (#83423)
  • Loading branch information
gmarouli committed Feb 7, 2022
commit 4c62643c301ca2f25fcbc5f2d8e2017e9e78aa5b
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,9 @@ public class RestHighLevelClient implements Closeable {
private volatile ListenableFuture<Optional<String>> versionValidationFuture;

private final IndicesClient indicesClient = new IndicesClient(this);
private final ClusterClient clusterClient = new ClusterClient(this);
private final IngestClient ingestClient = new IngestClient(this);
private final SnapshotClient snapshotClient = new SnapshotClient(this);
private final TasksClient tasksClient = new TasksClient(this);
private final XPackClient xPackClient = new XPackClient(this);
private final MigrationClient migrationClient = new MigrationClient(this);
private final MachineLearningClient machineLearningClient = new MachineLearningClient(this);
private final SecurityClient securityClient = new SecurityClient(this);
Expand Down Expand Up @@ -368,15 +366,6 @@ public final IndicesClient indices() {
return indicesClient;
}

/**
* Provides a {@link ClusterClient} which can be used to access the Cluster API.
*
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster.html">Cluster API on elastic.co</a>
*/
public final ClusterClient cluster() {
return clusterClient;
}

/**
* Provides a {@link IngestClient} which can be used to access the Ingest API.
*
Expand Down Expand Up @@ -404,19 +393,6 @@ public final TasksClient tasks() {
return tasksClient;
}

/**
* Provides methods for accessing the Elastic Licensed X-Pack Info
* and Usage APIs that are shipped with the default distribution of
* Elasticsearch. All of these APIs will 404 if run against the OSS
* distribution of Elasticsearch.
* <p>
* See the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/info-api.html">
* Info APIs on elastic.co</a> for more information.
*/
public final XPackClient xpack() {
return xPackClient;
}

/**
* A wrapper for the {@link RestHighLevelClient} that provides methods for accessing the Searchable Snapshots APIs.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -114,7 +113,7 @@ static List<HttpHost> parseHosts(String props) {
public static void configureRemoteClusters(List<Node> remoteNodes) throws Exception {
assertThat(remoteNodes, hasSize(3));
final String remoteClusterSettingPrefix = "cluster.remote." + CLUSTER_ALIAS + ".";
try (RestHighLevelClient localClient = newLocalClient()) {
try (RestClient localClient = newLocalClient().getLowLevelClient()) {
final Settings remoteConnectionSettings;
if (randomBoolean()) {
final List<String> seeds = remoteNodes.stream()
Expand All @@ -137,13 +136,9 @@ public static void configureRemoteClusters(List<Node> remoteNodes) throws Except
.put(remoteClusterSettingPrefix + "proxy_address", proxyNode.transportAddress)
.build();
}
assertTrue(
localClient.cluster()
.putSettings(new ClusterUpdateSettingsRequest().persistentSettings(remoteConnectionSettings), RequestOptions.DEFAULT)
.isAcknowledged()
);
updateClusterSettings(localClient, remoteConnectionSettings);
assertBusy(() -> {
final Response resp = localClient.getLowLevelClient().performRequest(new Request("GET", "/_remote/info"));
final Response resp = localClient.performRequest(new Request("GET", "/_remote/info"));
assertOK(resp);
final ObjectPath objectPath = ObjectPath.createFromResponse(resp);
assertNotNull(objectPath.evaluate(CLUSTER_ALIAS));
Expand Down Expand Up @@ -172,7 +167,7 @@ static int indexDocs(RestHighLevelClient client, String index, int numDocs) thro
}

void verifySearch(String localIndex, int localNumDocs, String remoteIndex, int remoteNumDocs, Integer preFilterShardSize) {
try (RestHighLevelClient localClient = newLocalClient()) {
try (RestClient localClient = newLocalClient().getLowLevelClient()) {
Request request = new Request("POST", "/_search");
final int expectedDocs;
if (randomBoolean()) {
Expand All @@ -193,7 +188,7 @@ void verifySearch(String localIndex, int localNumDocs, String remoteIndex, int r
}
int size = between(1, 100);
request.setJsonEntity("{\"sort\": \"f\", \"size\": " + size + "}");
Response response = localClient.getLowLevelClient().performRequest(request);
Response response = localClient.performRequest(request);
try (
XContentParser parser = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.SecureString;
Expand All @@ -28,6 +27,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.function.Consumer;

@SuppressWarnings("removal")
public abstract class AbstractMultiClusterRemoteTestCase extends ESRestTestCase {
Expand Down Expand Up @@ -58,8 +58,12 @@ public void initClientsAndConfigureClusters() throws Exception {
cluster1Client = buildClient("localhost:" + getProperty("test.fixtures.elasticsearch-" + getDistribution() + "-1.tcp.9200"));
cluster2Client = buildClient("localhost:" + getProperty("test.fixtures.elasticsearch-" + getDistribution() + "-2.tcp.9200"));

cluster1Client().cluster().health(new ClusterHealthRequest().waitForNodes("1").waitForYellowStatus(), RequestOptions.DEFAULT);
cluster2Client().cluster().health(new ClusterHealthRequest().waitForNodes("1").waitForYellowStatus(), RequestOptions.DEFAULT);
Consumer<Request> waitForYellowRequest = request -> {
request.addParameter("wait_for_status", "yellow");
request.addParameter("wait_for_nodes", "1");
};
ensureHealth(cluster1Client().getLowLevelClient(), waitForYellowRequest);
ensureHealth(cluster2Client().getLowLevelClient(), waitForYellowRequest);

initialized = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,21 @@
*/
package org.elasticsearch.cluster.remote.test;

import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.cluster.RemoteConnectionInfo;
import org.elasticsearch.client.cluster.RemoteInfoRequest;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xcontent.XContentFactory;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.util.Map;
import java.util.Optional;

import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assume.assumeThat;
Expand Down Expand Up @@ -74,27 +74,22 @@ public void clearIndices() throws IOException {

@After
public void clearRemoteClusterSettings() throws IOException {
ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest().persistentSettings(
Settings.builder().putNull("cluster.remote.*").build()
);
assertTrue(cluster1Client().cluster().putSettings(request, RequestOptions.DEFAULT).isAcknowledged());
assertTrue(cluster2Client().cluster().putSettings(request, RequestOptions.DEFAULT).isAcknowledged());
Settings setting = Settings.builder().putNull("cluster.remote.*").build();
updateClusterSettings(cluster1Client().getLowLevelClient(), setting);
updateClusterSettings(cluster2Client().getLowLevelClient(), setting);
}

public void testProxyModeConnectionWorks() throws IOException {
String cluster2RemoteClusterSeed = "elasticsearch-" + getDistribution() + "-2:9300";
logger.info("Configuring remote cluster [{}]", cluster2RemoteClusterSeed);
ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest().persistentSettings(
Settings.builder()
.put("cluster.remote.cluster2.mode", "proxy")
.put("cluster.remote.cluster2.proxy_address", cluster2RemoteClusterSeed)
.build()
);
assertTrue(cluster1Client().cluster().putSettings(request, RequestOptions.DEFAULT).isAcknowledged());
Settings settings = Settings.builder()
.put("cluster.remote.cluster2.mode", "proxy")
.put("cluster.remote.cluster2.proxy_address", cluster2RemoteClusterSeed)
.build();

updateClusterSettings(cluster1Client().getLowLevelClient(), settings);

RemoteConnectionInfo rci = cluster1Client().cluster().remoteInfo(new RemoteInfoRequest(), RequestOptions.DEFAULT).getInfos().get(0);
logger.info("Connection info: {}", rci);
assertTrue(rci.isConnected());
assertTrue(isConnected(cluster1Client().getLowLevelClient()));

assertEquals(
2L,
Expand All @@ -105,33 +100,25 @@ public void testProxyModeConnectionWorks() throws IOException {
public void testSniffModeConnectionFails() throws IOException {
String cluster2RemoteClusterSeed = "elasticsearch-" + getDistribution() + "-2:9300";
logger.info("Configuring remote cluster [{}]", cluster2RemoteClusterSeed);
ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest().persistentSettings(
Settings.builder()
.put("cluster.remote.cluster2alt.mode", "sniff")
.put("cluster.remote.cluster2alt.seeds", cluster2RemoteClusterSeed)
.build()
);
assertTrue(cluster1Client().cluster().putSettings(request, RequestOptions.DEFAULT).isAcknowledged());
Settings settings = Settings.builder()
.put("cluster.remote.cluster2alt.mode", "sniff")
.put("cluster.remote.cluster2alt.seeds", cluster2RemoteClusterSeed)
.build();
updateClusterSettings(cluster1Client().getLowLevelClient(), settings);

RemoteConnectionInfo rci = cluster1Client().cluster().remoteInfo(new RemoteInfoRequest(), RequestOptions.DEFAULT).getInfos().get(0);
logger.info("Connection info: {}", rci);
assertFalse(rci.isConnected());
assertFalse(isConnected(cluster1Client().getLowLevelClient()));
}

public void testHAProxyModeConnectionWorks() throws IOException {
String proxyAddress = "haproxy:9600";
logger.info("Configuring remote cluster [{}]", proxyAddress);
ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest().persistentSettings(
Settings.builder()
.put("cluster.remote.haproxynosn.mode", "proxy")
.put("cluster.remote.haproxynosn.proxy_address", proxyAddress)
.build()
);
assertTrue(cluster1Client().cluster().putSettings(request, RequestOptions.DEFAULT).isAcknowledged());
Settings settings = Settings.builder()
.put("cluster.remote.haproxynosn.mode", "proxy")
.put("cluster.remote.haproxynosn.proxy_address", proxyAddress)
.build();
updateClusterSettings(cluster1Client().getLowLevelClient(), settings);

RemoteConnectionInfo rci = cluster1Client().cluster().remoteInfo(new RemoteInfoRequest(), RequestOptions.DEFAULT).getInfos().get(0);
logger.info("Connection info: {}", rci);
assertTrue(rci.isConnected());
assertTrue(isConnected(cluster1Client().getLowLevelClient()));

assertEquals(
2L,
Expand All @@ -142,18 +129,14 @@ public void testHAProxyModeConnectionWorks() throws IOException {
public void testHAProxyModeConnectionWithSNIToCluster1Works() throws IOException {
assumeThat("test is only supported if the distribution contains xpack", getDistribution(), equalTo("default"));

ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest().persistentSettings(
Settings.builder()
.put("cluster.remote.haproxysni1.mode", "proxy")
.put("cluster.remote.haproxysni1.proxy_address", "haproxy:9600")
.put("cluster.remote.haproxysni1.server_name", "application1.example.com")
.build()
);
assertTrue(cluster2Client().cluster().putSettings(request, RequestOptions.DEFAULT).isAcknowledged());
Settings settings = Settings.builder()
.put("cluster.remote.haproxysni1.mode", "proxy")
.put("cluster.remote.haproxysni1.proxy_address", "haproxy:9600")
.put("cluster.remote.haproxysni1.server_name", "application1.example.com")
.build();
updateClusterSettings(cluster2Client().getLowLevelClient(), settings);

RemoteConnectionInfo rci = cluster2Client().cluster().remoteInfo(new RemoteInfoRequest(), RequestOptions.DEFAULT).getInfos().get(0);
logger.info("Connection info: {}", rci);
assertTrue(rci.isConnected());
assertTrue(isConnected(cluster2Client().getLowLevelClient()));

assertEquals(
1L,
Expand All @@ -164,22 +147,30 @@ public void testHAProxyModeConnectionWithSNIToCluster1Works() throws IOException
public void testHAProxyModeConnectionWithSNIToCluster2Works() throws IOException {
assumeThat("test is only supported if the distribution contains xpack", getDistribution(), equalTo("default"));

ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest().persistentSettings(
Settings.builder()
.put("cluster.remote.haproxysni2.mode", "proxy")
.put("cluster.remote.haproxysni2.proxy_address", "haproxy:9600")
.put("cluster.remote.haproxysni2.server_name", "application2.example.com")
.build()
);
assertTrue(cluster1Client().cluster().putSettings(request, RequestOptions.DEFAULT).isAcknowledged());
Settings settings = Settings.builder()
.put("cluster.remote.haproxysni2.mode", "proxy")
.put("cluster.remote.haproxysni2.proxy_address", "haproxy:9600")
.put("cluster.remote.haproxysni2.server_name", "application2.example.com")
.build();
updateClusterSettings(cluster1Client().getLowLevelClient(), settings);

RemoteConnectionInfo rci = cluster1Client().cluster().remoteInfo(new RemoteInfoRequest(), RequestOptions.DEFAULT).getInfos().get(0);
logger.info("Connection info: {}", rci);
assertTrue(rci.isConnected());
assertTrue(isConnected(cluster1Client().getLowLevelClient()));

assertEquals(
2L,
cluster1Client().search(new SearchRequest("haproxysni2:test2"), RequestOptions.DEFAULT).getHits().getTotalHits().value
);
}

@SuppressWarnings("unchecked")
private boolean isConnected(RestClient restClient) throws IOException {
Optional<Object> remoteConnectionInfo = getAsMap(restClient, "/_remote/info").values().stream().findFirst();
if (remoteConnectionInfo.isPresent()) {
logger.info("Connection info: {}", remoteConnectionInfo);
if (((Map<String, Object>) remoteConnectionInfo.get()).get("connected")instanceof Boolean connected) {
return connected;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1407,6 +1407,28 @@ public static void assertAcknowledged(Response response) throws IOException {
assertThat(jsonBody, containsString("\"acknowledged\":true"));
}

/**
* Updates the cluster with the provided settings (as persistent settings)
**/
public static void updateClusterSettings(Settings settings) throws IOException {
updateClusterSettings(client(), settings);
}

/**
* Updates the cluster with the provided settings (as persistent settings)
**/
public static void updateClusterSettings(RestClient client, Settings settings) throws IOException {
Request request = new Request("PUT", "/_cluster/settings");
String entity = String.format("""
{
"persistent":%s
}
""", Strings.toString(settings));
request.setJsonEntity(entity);
Response response = client.performRequest(request);
assertOK(response);
}

/**
* Permits subclasses to increase the default timeout when waiting for green health
*/
Expand Down Expand Up @@ -1440,6 +1462,10 @@ public static void ensureHealth(String index, Consumer<Request> requestConsumer)
ensureHealth(client(), index, requestConsumer);
}

public static void ensureHealth(RestClient restClient, Consumer<Request> requestConsumer) throws IOException {
ensureHealth(restClient, "", requestConsumer);
}

protected static void ensureHealth(RestClient restClient, String index, Consumer<Request> requestConsumer) throws IOException {
Request request = new Request("GET", "/_cluster/health" + (index.isBlank() ? "" : "/" + index));
requestConsumer.accept(request);
Expand Down Expand Up @@ -1604,7 +1630,11 @@ protected static Map<String, Object> getAlias(final String index, final String a
}

protected static Map<String, Object> getAsMap(final String endpoint) throws IOException {
Response response = client().performRequest(new Request("GET", endpoint));
return getAsMap(client(), endpoint);
}

protected static Map<String, Object> getAsMap(RestClient client, final String endpoint) throws IOException {
Response response = client.performRequest(new Request("GET", endpoint));
return responseAsMap(response);
}

Expand Down
Loading