Skip to content

Commit

Permalink
fix(plugin): fix IllegalStateException thrown when is restarted after…
Browse files Browse the repository at this point in the history
… a first crash while starting

Resolves: #28
  • Loading branch information
fhussonnois committed May 22, 2020
1 parent 3165120 commit d0abca9
Show file tree
Hide file tree
Showing 12 changed files with 88 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ static TypedStruct fromGenericRecord(final GenericRecord record) {
* @param value the record value.
* @return a new {@link TypedValue} instance.
*/
@SuppressWarnings("unchecked")
private static TypedValue fromSchemaAndValue(final Schema schema, final Object value) {
final Schema.Type fieldType = schema.getType();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public boolean hasNext() {
try {
return (nread > 0 || nread == -1) && reader.ready();
} catch (IOException e) {
LOG.error("Error while checking for remaining bytes to read", e.getLocalizedMessage());
LOG.error("Error while checking for remaining bytes to read: {}", e.getLocalizedMessage());
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ public LocalFileSystemScanner(final String sourceDirectoryPath,
} else if (cleaner instanceof BatchFileCleanupPolicy) {
this.cleaner = (BatchFileCleanupPolicy) cleaner;
} else {
throw new IllegalArgumentException("Cleaner must be one of 'FileCleanupPolicy', "
+ "'BatchFileCleanupPolicy', or the variants that are consumer aware and/or "
+ "Acknowledging"
+ " not " + cleaner.getClass().getName());
throw new IllegalArgumentException("Cleaner must be one of 'FileCleanupPolicy', "
+ "'BatchFileCleanupPolicy', or the variants that are consumer aware and/or "
+ "Acknowledging"
+ " not " + cleaner.getClass().getName());
}
this.offsetManager = offsetManager;
this.store = store;
Expand Down Expand Up @@ -281,9 +281,9 @@ public synchronized List<List<String>> partitionFilesAndGet(int maxGroups) {
sources.sort(BY_LAST_MODIFIED);

final List<String> paths = sources
.stream()
.map(SourceMetadata::absolutePath)
.collect(Collectors.toList());
.stream()
.map(SourceMetadata::absolutePath)
.collect(Collectors.toList());
partitions = ConnectorUtils.groupPartitions(paths, numGroups);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,18 @@
*/
package io.streamthoughts.kafka.connect.filepulse.source;

import static io.streamthoughts.kafka.connect.filepulse.config.TaskConfig.FILE_INPUT_PATHS_CONFIG;

import io.streamthoughts.kafka.connect.filepulse.Version;
import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicy;
import io.streamthoughts.kafka.connect.filepulse.config.ConnectorConfig;
import io.streamthoughts.kafka.connect.filepulse.offset.OffsetStrategy;
import io.streamthoughts.kafka.connect.filepulse.offset.SimpleOffsetManager;
import io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreRegistry;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.scanner.FileSystemScanner;
import io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner;
import io.streamthoughts.kafka.connect.filepulse.scanner.local.FSDirectoryWalker;
import io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.CompositeFileListFilter;
import io.streamthoughts.kafka.connect.filepulse.state.FileStateBackingStore;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreRegistry;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
Expand All @@ -45,6 +38,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static io.streamthoughts.kafka.connect.filepulse.config.TaskConfig.FILE_INPUT_PATHS_CONFIG;

/**
* The FilePulseSourceConnector.
*/
Expand Down Expand Up @@ -95,24 +96,40 @@ public void start(final Map<String, String> props) {
groupId, configs);
});

final StateBackingStore<SourceFile> store = StateBackingStoreRegistry.instance().get(groupId);

final FSDirectoryWalker directoryScanner = this.config.directoryScanner();
directoryScanner.setFilter(new CompositeFileListFilter(config.filters()));

scanner = new LocalFileSystemScanner(
final FileCleanupPolicy cleaner = config.cleanupPolicy();
final OffsetStrategy strategy = config.offsetStrategy();

final StateBackingStore<SourceFile> store = StateBackingStoreRegistry.instance().get(groupId);
if (store.isStarted()) {
throw new ConnectException(
"Fail to start new FilePulseSourceConnector instance. " +
"A Connector already exists for property internal.kafka.reporter.id=" + groupId
);
}
try {
scanner = new LocalFileSystemScanner(
config.scanDirectoryPath(),
directoryScanner,
config.cleanupPolicy(),
new SimpleOffsetManager(config.offsetStrategy()),
cleaner,
new SimpleOffsetManager(strategy),
store);

fsMonitorThread = new FileSystemMonitorThread(context, scanner, config.scanInternalMs());
fsMonitorThread.setUncaughtExceptionHandler((t, e) -> {
LOG.info("Uncaught error from file system monitoring thread [{}]", t.getName(), e);
throw new ConnectException(e);
});
fsMonitorThread.start();
fsMonitorThread = new FileSystemMonitorThread(context, scanner, config.scanInternalMs());
fsMonitorThread.setUncaughtExceptionHandler((t, e) -> {
LOG.info("Uncaught error from file system monitoring thread [{}]", t.getName(), e);
throw new ConnectException(e);
});
fsMonitorThread.start();
} catch (Exception e) {
StateBackingStoreRegistry.instance().release(groupId);
if( fsMonitorThread != null) {
fsMonitorThread.shutdown(0L);
}
throw e;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ private StateBackingStore<SourceFile> getStateStatesBackingStore() {
* {@inheritDoc}
*/
@Override
@SuppressWarnings("unchecked")
public List<SourceRecord> poll() throws InterruptedException {
contextToBeCommitted = consumer.context();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
public class FileSystemMonitorThread extends Thread {

private static final Logger LOG = LoggerFactory.getLogger(FileSystemMonitorThread.class);
private static final long SHUTDOWN_TIMEOUT = 5L;

private static final long SHUTDOWN_TIMEOUT_MS = 5000L;

private final ConnectorContext context;
private final CountDownLatch shutdownLatch;
Expand Down Expand Up @@ -98,10 +99,14 @@ public void run() {
}

void shutdown() {
shutdown(SHUTDOWN_TIMEOUT_MS);
}

void shutdown(final long timeoutMs) {
LOG.info("Shutting down thread monitoring filesystem.");
this.shutdownLatch.countDown();
try {
this.waitingLatch.await(SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
this.waitingLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
LOG.error("Timeout : scan loop is not terminated yet.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public void onStart(final FileContext context) {
notify(context.metadata(), context.offset(), SourceStatus.STARTED);
}


/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ public synchronized void release(final String name) {
LOG.info("Releasing access on {} instance for group {}", store.getClass().getSimpleName(), name);
final Integer ref = refs.compute(name, (k, v) -> v == null ? null : (v - 1 == 0) ? null : v -1);
if (ref == null) {
LOG.info("Stopping instance registered for group {}", store.getClass().getSimpleName(), name);
LOG.info("Stopping instance registered instance {} for group {}", store.getClass().getSimpleName(), name);
store.stop();
stores.remove(name);
}
}

public boolean has(final String name) {
boolean has(final String name) {
return stores.containsKey(name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,23 @@ private synchronized void setState(final States status) {
*/
@Override
public void start() {
if (!getState().equals(States.STARTED)) {
LOG.info("Starting {}", getBackingStoreName());
// Before startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that
// updates can continue to occur in the background
configLog.start();
setState(States.STARTED);
LOG.info("Started {}", getBackingStoreName());
} else {
if (isStarted()) {
throw new IllegalStateException("Cannot init again.");
}
LOG.info("Starting {}", getBackingStoreName());
// Before startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that
// updates can continue to occur in the background
configLog.start();
setState(States.STARTED);
LOG.info("Started {}", getBackingStoreName());
}

/**
* {@inheritDoc}
*/
@Override
public boolean isStarted() {
return getState().equals(States.STARTED);
}

/**
Expand Down Expand Up @@ -262,7 +269,7 @@ public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record)
try {
newState = serde.deserialize(value);
} catch (Exception e) {
LOG.error("Failed to read state : {}", e);
LOG.error("Failed to read state : {}", stateName, e);
return;
}
LOG.debug("Updating state for name {} : {}", stateName, newState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public interface StateBackingStore<T> {
*/
void stop();

/**
* Checks if the store is started.
*/
boolean isStarted();

/**
* Get a snapshot of the current state including.
* configurations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public void stop() {

}

@Override
public boolean isStarted() {
return true;
}

@Override
public StateSnapshot<V> snapshot() {
return state;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public void stop() {
isStopped = true;
}

@Override
public boolean isStarted() {
return true;
}

@Override
public StateSnapshot<T> snapshot() {
return null;
Expand Down

0 comments on commit d0abca9

Please sign in to comment.