Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform] Rename internal indexes for transform plugin #47788

Merged
merged 10 commits into from
Oct 11, 2019
Merged
Prev Previous commit
Next Next commit
use old and new indexes in rolling upgrade IT
  • Loading branch information
Hendrik Muhs committed Oct 10, 2019
commit 124d3eedbffbf63994a99ec3b1e072f7452b9cbc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import static org.elasticsearch.xpack.test.rest.XPackRestTestConstants.TRANSFORM_INTERNAL_INDEX_PREFIX_DEPRECATED;
import static org.elasticsearch.xpack.test.rest.XPackRestTestConstants.TRANSFORM_NOTIFICATIONS_INDEX_PREFIX;
import static org.elasticsearch.xpack.test.rest.XPackRestTestConstants.TRANSFORM_NOTIFICATIONS_INDEX_PREFIX_DEPRECATED;
import static org.elasticsearch.xpack.test.rest.XPackRestTestConstants.TRANSFORM_TASK_NAME;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand All @@ -64,8 +65,8 @@
public class TransformSurvivesUpgradeIT extends AbstractUpgradeTestCase {

private static final Version UPGRADE_FROM_VERSION = Version.fromString(System.getProperty("tests.upgrade_from_version"));
private static final String DATAFRAME_ENDPOINT = "/_transform/";
private static final String DATAFRAME_ENDPOINT_DEPRECATED = "/_data_frame/transforms/";
private static final String TRANSFORM_ENDPOINT = "/_transform/";
private static final String TRANSFORM_ENDPOINT_DEPRECATED = "/_data_frame/transforms/";
private static final String CONTINUOUS_TRANSFORM_ID = "continuous-transform-upgrade-job";
private static final String CONTINUOUS_TRANSFORM_SOURCE = "transform-upgrade-continuous-source";
private static final List<String> ENTITIES = Stream.iterate(1, n -> n + 1)
Expand Down Expand Up @@ -106,8 +107,8 @@ public void waitForTemplates() throws Exception {
});
}

protected static void waitForPendingDataFrameTasks() throws Exception {
waitForPendingTasks(adminClient(), taskName -> taskName.startsWith("data_frame/transforms") == false);
protected static void waitForPendingTransformTasks() throws Exception {
waitForPendingTasks(adminClient(), taskName -> taskName.startsWith(TRANSFORM_TASK_NAME) == false);
}

@Override
Expand All @@ -122,8 +123,8 @@ protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOE
* The purpose of this test is to ensure that when a job is open through a rolling upgrade we upgrade the results
* index mappings when it is assigned to an upgraded node even if no other ML endpoint is called after the upgrade
*/
public void testDataFramesRollingUpgrade() throws Exception {
assumeTrue("Continuous data frames not supported until 7.3", UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_3_0));
public void testTransformRollingUpgrade() throws Exception {
assumeTrue("Continuous transform not supported until 7.3", UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_3_0));
Request adjustLoggingLevels = new Request("PUT", "/_cluster/settings");
adjustLoggingLevels.setJsonEntity(
"{\"transient\": {" +
Expand All @@ -138,19 +139,19 @@ public void testDataFramesRollingUpgrade() throws Exception {
switch (CLUSTER_TYPE) {
case OLD:
client().performRequest(waitForYellow);
createAndStartContinuousDataFrame();
createAndStartContinuousTransform();
break;
case MIXED:
client().performRequest(waitForYellow);
long lastCheckpoint = 1;
if (Booleans.parseBoolean(System.getProperty("tests.first_round")) == false) {
lastCheckpoint = 2;
}
verifyContinuousDataFrameHandlesData(lastCheckpoint);
verifyContinuousTransformHandlesData(lastCheckpoint);
break;
case UPGRADED:
client().performRequest(waitForYellow);
verifyContinuousDataFrameHandlesData(3);
verifyContinuousTransformHandlesData(3);
cleanUpTransforms();
break;
default:
Expand All @@ -161,10 +162,10 @@ public void testDataFramesRollingUpgrade() throws Exception {
private void cleanUpTransforms() throws Exception {
stopTransform(CONTINUOUS_TRANSFORM_ID);
deleteTransform(CONTINUOUS_TRANSFORM_ID);
waitForPendingDataFrameTasks();
waitForPendingTransformTasks();
}

private void createAndStartContinuousDataFrame() throws Exception {
private void createAndStartContinuousTransform() throws Exception {
createIndex(CONTINUOUS_TRANSFORM_SOURCE);
long totalDocsWrittenSum = 0;
for (TimeValue bucket : BUCKETS) {
Expand Down Expand Up @@ -204,9 +205,9 @@ private void createAndStartContinuousDataFrame() throws Exception {
}

@SuppressWarnings("unchecked")
private void verifyContinuousDataFrameHandlesData(long expectedLastCheckpoint) throws Exception {
private void verifyContinuousTransformHandlesData(long expectedLastCheckpoint) throws Exception {

// A continuous data frame should automatically become started when it gets assigned to a node
// A continuous transform should automatically become started when it gets assigned to a node
// if it was assigned to the node that was removed from the cluster
assertBusy(() -> {
TransformStats stateAndStats = getTransformStats(CONTINUOUS_TRANSFORM_ID);
Expand Down Expand Up @@ -250,7 +251,11 @@ private void verifyContinuousDataFrameHandlesData(long expectedLastCheckpoint) t
}

private void awaitWrittenIndexerState(String id, Consumer<Map<?, ?>> responseAssertion) throws Exception {
Request getStatsDocsRequest = new Request("GET", ".data-frame-internal-*/_search");
Request getStatsDocsRequest = new Request("GET",
TRANSFORM_INTERNAL_INDEX_PREFIX + "*," +
TRANSFORM_INTERNAL_INDEX_PREFIX_DEPRECATED + "*" +
"/_search");

getStatsDocsRequest.setJsonEntity("{\n" +
" \"query\": {\n" +
" \"bool\": {\n" +
Expand All @@ -271,7 +276,8 @@ private void awaitWrittenIndexerState(String id, Consumer<Map<?, ?>> responseAss
"}");
assertBusy(() -> {
// Want to make sure we get the latest docs
client().performRequest(new Request("POST", ".data-frame-internal-*/_refresh"));
client().performRequest(new Request("POST", TRANSFORM_INTERNAL_INDEX_PREFIX + "*/_refresh"));
client().performRequest(new Request("POST", TRANSFORM_INTERNAL_INDEX_PREFIX_DEPRECATED + "*/_refresh"));
Response response = client().performRequest(getStatsDocsRequest);
assertEquals(200, response.getStatusLine().getStatusCode());
Map<String, Object> responseBody = entityAsMap(response);
Expand All @@ -290,7 +296,7 @@ private void awaitWrittenIndexerState(String id, String indexerState) throws Exc
}

private String getTransformEndpoint() {
return CLUSTER_TYPE == ClusterType.UPGRADED ? DATAFRAME_ENDPOINT : DATAFRAME_ENDPOINT_DEPRECATED;
return CLUSTER_TYPE == ClusterType.UPGRADED ? TRANSFORM_ENDPOINT : TRANSFORM_ENDPOINT_DEPRECATED;
}

private void putTransform(String id, TransformConfig config) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public final class XPackRestTestConstants {
CONFIG_INDEX);

// Transform constants:
public static final String TRANSFORM_TASK_NAME = "data_frame/transforms";
public static final String TRANSFORM_INTERNAL_INDEX_PREFIX = ".transform-internal-";
public static final String TRANSFORM_NOTIFICATIONS_INDEX_PREFIX = ".transform-notifications-";
public static final String TRANSFORM_INTERNAL_INDEX_PREFIX_DEPRECATED = ".data-frame-internal-";
Expand Down