Skip to content

Commit

Permalink
Create first backing index when creating data stream (elastic#54467)
Browse files Browse the repository at this point in the history
  • Loading branch information
danhermann committed Apr 2, 2020
1 parent db572af commit 42f513c
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
"Create data stream":
- skip:
version: " - 7.6.99"
reason: available only in 7.7+
version: " - 7.99.99"
reason: "enable in 7.8+ after back-porting https://github.com/elastic/elasticsearch/pull/54467"

- do:
indices.create_data_stream:
Expand All @@ -22,10 +22,17 @@
indices.get_data_streams: {}
- match: { 0.name: simple-data-stream1 }
- match: { 0.timestamp_field: '@timestamp' }
- match: { 0.indices: [] }
- length: { 0.indices: 1 }
- match: { 0.indices.0.index_name: 'simple-data-stream1-000001' }
- match: { 1.name: simple-data-stream2 }
- match: { 1.timestamp_field: '@timestamp2' }
- match: { 1.indices: [] }
- length: { 1.indices: 1 }
- match: { 1.indices.0.index_name: 'simple-data-stream2-000001' }

- do:
indices.delete_data_stream:
name: simple-data-stream1
- is_true: acknowledged

- do:
indices.delete_data_stream:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
Expand All @@ -33,6 +34,7 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
Expand All @@ -42,13 +44,14 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
Expand Down Expand Up @@ -117,10 +120,14 @@ public int hashCode() {

public static class TransportAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {

private final MetadataCreateIndexService metadataCreateIndexService;

@Inject
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
MetadataCreateIndexService metaDataCreateIndexService) {
super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver);
this.metadataCreateIndexService = metaDataCreateIndexService;
}

@Override
Expand Down Expand Up @@ -151,7 +158,7 @@ public void onFailure(String source, Exception e) {

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return createDataStream(currentState, request);
return createDataStream(metadataCreateIndexService, currentState, request);
}

@Override
Expand All @@ -161,16 +168,26 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
});
}

static ClusterState createDataStream(ClusterState currentState, Request request) {
static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIndexService,
ClusterState currentState,
Request request) throws Exception {
if (currentState.metadata().dataStreams().containsKey(request.name)) {
throw new IllegalArgumentException("data_stream [" + request.name + "] already exists");
}

MetadataCreateIndexService.validateIndexOrAliasName(request.name,
(s1, s2) -> new IllegalArgumentException("data_stream [" + s1 + "] " + s2));

String firstBackingIndexName = request.name + "-000001";
CreateIndexClusterStateUpdateRequest createIndexRequest =
new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName)
.settings(Settings.builder().put("index.hidden", true).build());
currentState = metadataCreateIndexService.applyCreateIndexRequest(currentState, createIndexRequest, false);
IndexMetadata firstBackingIndex = currentState.metadata().index(firstBackingIndexName);
assert firstBackingIndex != null;

Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(
new DataStream(request.name, request.timestampFieldName, Collections.emptyList()));
new DataStream(request.name, request.timestampFieldName, List.of(firstBackingIndex.getIndex())));
logger.info("adding data stream [{}]", request.name);
return ClusterState.builder(currentState).metadata(builder).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public interface IndexAbstraction {

/**
* A write index is a dedicated concrete index, that accepts all the new documents that belong to an index abstraction.
*
* <p>
* A write index may also be a regular concrete index of a index abstraction and may therefore also be returned
* by {@link #getIndices()}. An index abstraction may also not have a dedicated write index.
*
Expand Down Expand Up @@ -87,7 +87,14 @@ enum Type {
* An alias typically refers to many concrete indices and
* may have a write index.
*/
ALIAS("alias");
ALIAS("alias"),

/**
* An index abstraction that refers to a data stream.
* A data stream typically has multiple backing indices, the latest of which
* is the target for index requests.
*/
DATA_STREAM("data_stream");

private final String displayName;

Expand Down Expand Up @@ -181,7 +188,7 @@ public boolean isHidden() {

/**
* Returns the unique alias metadata per concrete index.
*
* <p>
* (note that although alias can point to the same concrete indices, each alias reference may have its own routing
* and filters)
*/
Expand Down Expand Up @@ -233,7 +240,7 @@ public void computeAndValidateAliasProperties() {

// Validate hidden status
final Map<Boolean, List<IndexMetadata>> groupedByHiddenStatus = referenceIndexMetadatas.stream()
.collect(Collectors.groupingBy(idxMeta -> Boolean.TRUE.equals(idxMeta.getAliases().get(aliasName).isHidden())));
.collect(Collectors.groupingBy(idxMeta -> Boolean.TRUE.equals(idxMeta.getAliases().get(aliasName).isHidden())));
if (isNonEmpty(groupedByHiddenStatus.get(true)) && isNonEmpty(groupedByHiddenStatus.get(false))) {
List<String> hiddenOn = groupedByHiddenStatus.get(true).stream()
.map(idx -> idx.getIndex().getName()).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import java.util.TreeMap;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.common.settings.Settings.readSettingsFromStream;
Expand Down Expand Up @@ -1367,6 +1368,7 @@ private SortedMap<String, IndexAbstraction> buildIndicesLookup() {
});
}
}

aliasAndIndexLookup.values().stream()
.filter(aliasOrIndex -> aliasOrIndex.getType() == IndexAbstraction.Type.ALIAS)
.forEach(alias -> ((IndexAbstraction.Alias) alias).computeAndValidateAliasProperties());
Expand All @@ -1377,15 +1379,28 @@ private void validateDataStreams(SortedMap<String, IndexAbstraction> indicesLook
DataStreamMetadata dsMetadata = (DataStreamMetadata) customs.get(DataStreamMetadata.TYPE);
if (dsMetadata != null) {
for (DataStream ds : dsMetadata.dataStreams().values()) {
if (indicesLookup.containsKey(ds.getName())) {
IndexAbstraction existing = indicesLookup.get(ds.getName());
if (existing != null && existing.getType() != IndexAbstraction.Type.DATA_STREAM) {
throw new IllegalStateException("data stream [" + ds.getName() + "] conflicts with existing index or alias");
}

SortedMap<?, ?> map = indicesLookup.subMap(ds.getName() + "-", ds.getName() + "."); // '.' is the char after '-'
if (map.size() != 0) {
throw new IllegalStateException("data stream [" + ds.getName() +
"] could create backing indices that conflict with " + map.size() + " existing index(s) or alias(s)" +
" including '" + map.firstKey() + "'");
SortedMap<String, IndexAbstraction> potentialConflicts =
indicesLookup.subMap(ds.getName() + "-", ds.getName() + "."); // '.' is the char after '-'
if (potentialConflicts.size() != 0) {
List<String> indexNames = ds.getIndices().stream().map(Index::getName).collect(Collectors.toList());
List<String> conflicts = new ArrayList<>();
for (Map.Entry<String, IndexAbstraction> entry : potentialConflicts.entrySet()) {
if (entry.getValue().getType() != IndexAbstraction.Type.CONCRETE_INDEX ||
indexNames.contains(entry.getKey()) == false) {
conflicts.add(entry.getKey());
}
}

if (conflicts.size() > 0) {
throw new IllegalStateException("data stream [" + ds.getName() +
"] could create backing indices that conflict with " + conflicts.size() + " existing index(s) or alias(s)" +
" including '" + conflicts.get(0) + "'");
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,30 @@
*/
package org.elasticsearch.action.admin.indices.datastream;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction.Request;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.AbstractWireSerializingTestCase;

import java.util.Collections;
import java.util.Map;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCase<Request> {

Expand Down Expand Up @@ -62,33 +72,60 @@ public void testValidateRequestWithoutTimestampField() {
assertThat(e.validationErrors().get(0), containsString("timestamp field name is missing"));
}

public void testCreateDataStream() {
public void testCreateDataStream() throws Exception {
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
final String dataStreamName = "my-data-stream";
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(cs, req);
ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req);
assertThat(newState.metadata().dataStreams().size(), equalTo(1));
assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
assertThat(newState.metadata().index(dataStreamName + "-000001"), notNullValue());
assertThat(newState.metadata().index(dataStreamName + "-000001").getSettings().get("index.hidden"), equalTo("true"));
}

public void testCreateDuplicateDataStream() {
public void testCreateDuplicateDataStream() throws Exception {
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
final String dataStreamName = "my-data-stream";
DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", Collections.emptyList());
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metadata(Metadata.builder().dataStreams(Map.of(dataStreamName, existingDataStream)).build()).build();
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);

IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> CreateDataStreamAction.TransportAction.createDataStream(cs, req));
() -> CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req));
assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] already exists"));
}

public void testCreateDataStreamWithInvalidName() {
public void testCreateDataStreamWithInvalidName() throws Exception {
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
final String dataStreamName = "_My-da#ta- ,stream-";
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> CreateDataStreamAction.TransportAction.createDataStream(cs, req));
() -> CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req));
assertThat(e.getMessage(), containsString("must not contain the following characters"));
}

private static MetadataCreateIndexService getMetadataCreateIndexService() throws Exception {
MetadataCreateIndexService s = mock(MetadataCreateIndexService.class);
when(s.applyCreateIndexRequest(any(ClusterState.class), any(CreateIndexClusterStateUpdateRequest.class), anyBoolean()))
.thenAnswer(mockInvocation -> {
ClusterState currentState = (ClusterState) mockInvocation.getArguments()[0];
CreateIndexClusterStateUpdateRequest request = (CreateIndexClusterStateUpdateRequest) mockInvocation.getArguments()[1];

Metadata.Builder b = Metadata.builder(currentState.metadata())
.put(IndexMetadata.builder(request.index())
.settings(Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(request.settings())
.build())
.numberOfShards(1)
.numberOfReplicas(1)
.build(), false);
return ClusterState.builder(currentState).metadata(b.build()).build();
});

return s;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -939,7 +941,7 @@ public void testBuilderRejectsDataStreamThatConflictsWithAlias() {

public void testBuilderRejectsDataStreamWithConflictingBackingIndices() {
final String dataStreamName = "my-data-stream";
final String conflictingIndex = dataStreamName + "-00001";
final String conflictingIndex = dataStreamName + "-000001";
Metadata.Builder b = Metadata.builder()
.put(IndexMetadata.builder(conflictingIndex)
.settings(settings(Version.CURRENT))
Expand All @@ -953,6 +955,29 @@ public void testBuilderRejectsDataStreamWithConflictingBackingIndices() {
"] could create backing indices that conflict with 1 existing index(s) or alias(s) including '" + conflictingIndex + "'"));
}

public void testBuilderForDataStreamWithRandomlyNumberedBackingIndices() {
final String dataStreamName = "my-data-stream";
final List<Index> backingIndices = new ArrayList<>();
final int numBackingIndices = randomIntBetween(2, 5);
int lastBackingIndexNum = randomIntBetween(9, 50);
Metadata.Builder b = Metadata.builder();
for (int k = 1; k <= numBackingIndices; k++) {
IndexMetadata im = IndexMetadata.builder(String.format(Locale.ROOT, "%s-%06d", dataStreamName, lastBackingIndexNum))
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
b.put(im, false);
backingIndices.add(im.getIndex());
lastBackingIndexNum = randomIntBetween(lastBackingIndexNum + 1, lastBackingIndexNum + 50);
}

b.put(new DataStream(dataStreamName, "ts", backingIndices));
Metadata metadata = b.build();
assertThat(metadata.dataStreams().size(), equalTo(1));
assertThat(metadata.dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
}

public void testSerialization() throws IOException {
final Metadata orig = randomMetadata();
final BytesStreamOutput out = new BytesStreamOutput();
Expand Down

0 comments on commit 42f513c

Please sign in to comment.