Skip to content

Commit

Permalink
DRILL-7975: Connection to Splunk Drill Storage Plugin fails intermitt…
Browse files Browse the repository at this point in the history
…ently (apache#2278)

* DRILL-7975: Connection to Splunk Drill Storage Plugin fails intermittently

* Changes according to review

* Removing reconnectRetries from all plugins. Removing deserializing splunk json config in tests.

* Revert CI Direct Memory property: 3200Mb -> 2500Mb
  • Loading branch information
vdiravka committed Jul 27, 2021
1 parent 47f6077 commit 958d849
Show file tree
Hide file tree
Showing 13 changed files with 76 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public DruidStoragePluginConfig(
@JsonProperty("brokerAddress") String brokerAddress,
@JsonProperty("coordinatorAddress") String coordinatorAddress,
@JsonProperty("averageRowSizeBytes") Integer averageRowSizeBytes) {

this.brokerAddress = brokerAddress;
this.coordinatorAddress = coordinatorAddress;
this.averageRowSizeBytes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.kudu.client.SessionConfiguration;
import org.junit.experimental.categories.Category;

@Ignore("requires remote kudu server")
@Ignore("requires remote kudu server") // TODO: can be rewritten by leveraging kudu docker container: DRILL-7977
@Category(KuduStorageTest.class)
public class TestKuduConnect extends BaseTest {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestKuduConnect.class);
Expand Down
11 changes: 7 additions & 4 deletions contrib/storage-splunk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,24 @@ This plugin enables Drill to query Splunk.

## Configuration
To connect Drill to Splunk, create a new storage plugin with the following configuration:

To successfully connect, Splunk uses port `8089` for interfaces. This port must be open for Drill to query Splunk.

```json
{
"type":"splunk",
"enabled": false,
"username": "admin",
"password": "changeme",
"hostname": "localhost",
"port": 8089,
"earliestTime": "-14d",
"latestTime": "now",
"enabled": false
"reconnectRetries": 3
}
```
To successfully connect, Splunk uses port `8089` for interfaces. This port must be open for Drill to query Splunk.

Sometimes Splunk has issue in connection to it:
https://github.com/splunk/splunk-sdk-java/issues/62 <br>
To bypass it by Drill please specify "reconnectRetries": 3. It allows you to retry the connection several times.

## Understanding Splunk's Data Model
Splunk's primary use case is analyzing event logs with a timestamp. As such, data is indexed by the timestamp, with the most recent data being indexed first. By default, Splunk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
* This class wraps the functionality of the Splunk connection for Drill.
*/
Expand All @@ -41,11 +43,13 @@ public class SplunkConnection {
private final String hostname;
private final int port;
private Service service;
private int connectionAttempts;

public SplunkConnection(SplunkPluginConfig config) {
this.credentials = config.getUsernamePasswordCredentials();
this.hostname = config.getHostname();
this.port = config.getPort();
this.connectionAttempts = config.getReconnectRetries();
service = connect();
ConfCollection confs = service.getConfs();
}
Expand All @@ -71,10 +75,18 @@ public Service connect() {
loginArgs.setPort(port);
loginArgs.setPassword(credentials.getPassword());
loginArgs.setUsername(credentials.getUsername());

try {
connectionAttempts--;
service = Service.connect(loginArgs);
} catch (Exception e) {
if(connectionAttempts > 0) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException interruptedException) {
logger.error("Unable to wait 2 secs before next connection trey to Splunk");
}
return connect();
}
throw UserException
.connectionError()
.message("Unable to connect to Splunk at %s:%s", hostname, port)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@
public class SplunkPluginConfig extends AbstractSecuredStoragePluginConfig {

public static final String NAME = "splunk";
public static final int DISABLED_RECONNECT_RETRIES = 1;

private final String hostname;
private final String earliestTime;
private final String latestTime;

private final int port;
private final Integer reconnectRetries;

@JsonCreator
public SplunkPluginConfig(@JsonProperty("username") String username,
Expand All @@ -48,13 +50,15 @@ public SplunkPluginConfig(@JsonProperty("username") String username,
@JsonProperty("port") int port,
@JsonProperty("earliestTime") String earliestTime,
@JsonProperty("latestTime") String latestTime,
@JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider) {
@JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider,
@JsonProperty("reconnectRetries") Integer reconnectRetries) {
super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider),
credentialsProvider == null);
this.hostname = hostname;
this.port = port;
this.earliestTime = earliestTime;
this.latestTime = latestTime == null ? "now" : latestTime;
this.reconnectRetries = reconnectRetries;
}

@JsonIgnore
Expand Down Expand Up @@ -98,6 +102,10 @@ public String getLatestTime() {
return latestTime;
}

@JsonProperty("reconnectRetries")
public int getReconnectRetries() {
return reconnectRetries != null ? reconnectRetries : DISABLED_RECONNECT_RETRIES;
}

@Override
public boolean equals(Object that) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
"storage":{
"splunk" : {
"type":"splunk",
"enabled": false,
"username": "admin",
"password": "changeme",
"hostname": "localhost",
"port": 8089,
"earliestTime": "-14d",
"latestTime": "now",
"enabled": false
"reconnectRetries": 2
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public void testConnectionFail() {
SPLUNK_STORAGE_PLUGIN_CONFIG.getPort(),
SPLUNK_STORAGE_PLUGIN_CONFIG.getEarliestTime(),
SPLUNK_STORAGE_PLUGIN_CONFIG.getLatestTime(),
null
null,
SPLUNK_STORAGE_PLUGIN_CONFIG.getReconnectRetries()
);
SplunkConnection sc = new SplunkConnection(invalidSplunkConfig);
sc.connect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.store.splunk;

import com.splunk.Service;
import com.splunk.ServiceArgs;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos;
Expand All @@ -30,10 +32,14 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runners.MethodSorters;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

import static org.apache.drill.exec.store.splunk.SplunkTestSuite.SPLUNK_STORAGE_PLUGIN_CONFIG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.times;

@FixMethodOrder(MethodSorters.JVM)
@Category({SlowTest.class})
Expand Down Expand Up @@ -290,4 +296,25 @@ public void testSerDe() throws Exception {
int cnt = queryBuilder().physical(plan).singletonInt();
assertEquals("Counts should match", 1, cnt);
}

@Test
public void testReconnectRetries() {
try (MockedStatic<Service> splunk = Mockito.mockStatic(Service.class)) {
ServiceArgs loginArgs = new ServiceArgs();
loginArgs.setHost(SPLUNK_STORAGE_PLUGIN_CONFIG.getHostname());
loginArgs.setPort(SPLUNK_STORAGE_PLUGIN_CONFIG.getPort());
loginArgs.setPassword(SPLUNK_STORAGE_PLUGIN_CONFIG.getPassword());
loginArgs.setUsername(SPLUNK_STORAGE_PLUGIN_CONFIG.getUsername());
splunk.when(() -> Service.connect(loginArgs))
.thenThrow(new RuntimeException("Fail first connection to Splunk"))
.thenThrow(new RuntimeException("Fail second connection to Splunk"))
.thenThrow(new RuntimeException("Fail third connection to Splunk"))
.thenReturn(new Service(loginArgs)); // fourth connection is successful
new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG); // it will fail, in case "reconnectRetries": 1 is specified in configs
splunk.verify(
() -> Service.connect(loginArgs),
times(4)
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public static void initSplunk() throws Exception {
String hostname = splunk.getHost();
Integer port = splunk.getFirstMappedPort();
StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
SPLUNK_STORAGE_PLUGIN_CONFIG = new SplunkPluginConfig(SPLUNK_LOGIN, SPLUNK_PASS, hostname, port, "1", "now", null);
SPLUNK_STORAGE_PLUGIN_CONFIG = new SplunkPluginConfig(SPLUNK_LOGIN, SPLUNK_PASS, hostname, port, "1", "now",
null, 4);
SPLUNK_STORAGE_PLUGIN_CONFIG.setEnabled(true);
pluginRegistry.put(SplunkPluginConfig.NAME, SPLUNK_STORAGE_PLUGIN_CONFIG);
runningSuite = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public SqlNode rewrite(SqlNode sqlNode) throws ForemanSetupException {

if (schemaPlus == null) {
throw UserException.validationError()
.message(String.format("Invalid schema name [%s]", SchemaUtilites.getSchemaPath(schemaNames)))
.message("Invalid schema name [%s]", SchemaUtilites.getSchemaPath(schemaNames))
.build(logger);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
public class InfoSchemaConfig extends StoragePluginConfig {

public static final String NAME = "ischema";

public static final InfoSchemaConfig INSTANCE = new InfoSchemaConfig();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@ public abstract class AbstractSecuredStoragePluginConfig extends StoragePluginCo
protected final CredentialsProvider credentialsProvider;
protected boolean directCredentials;

public AbstractSecuredStoragePluginConfig() {
this(PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER, true);
}

public AbstractSecuredStoragePluginConfig(CredentialsProvider credentialsProvider, boolean directCredentials) {
this.credentialsProvider = credentialsProvider;
this.directCredentials = directCredentials;
}

public AbstractSecuredStoragePluginConfig() {
this.credentialsProvider = PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER;
this.directCredentials = true;
}

public CredentialsProvider getCredentialsProvider() {
if (directCredentials) {
return null;
Expand Down
10 changes: 8 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
<wiremock.standalone.version>2.23.2</wiremock.standalone.version>
<jmockit.version>1.47</jmockit.version>
<logback.version>1.2.3</logback.version>
<mockito.version>3.11.0</mockito.version>
<mockito.version>3.11.2</mockito.version>
<!--
Currently Hive storage plugin only supports Apache Hive 3.1.2 or vendor specific variants of the
Apache Hive 2.3.2. If the version is changed, make sure the jars and their dependencies are updated,
Expand Down Expand Up @@ -1117,7 +1117,13 @@
long as Mockito _contains_ older Hamcrest classes. See DRILL-2130. -->
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.23.4</version>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down

0 comments on commit 958d849

Please sign in to comment.