Skip to content

Commit

Permalink
test proving we have an offsets problem (#18)
Browse files Browse the repository at this point in the history
* Rework Sink to properly handle offsets given to us by Connect
* num_records was being calculated incorrectly. might affect source replay
  • Loading branch information
iamnoah authored Dec 1, 2016
1 parent 0d5df73 commit 8aedb8c
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 166 deletions.
1 change: 0 additions & 1 deletion circle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ machine:

dependencies:
pre:
- ./gradlew classes testClasses
- docker pull lphoward/fake-s3 # pre-cache for integration tests

test:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.zip.GZIPOutputStream;

import org.apache.kafka.connect.errors.RetriableException;
Expand Down Expand Up @@ -43,11 +41,6 @@ public class BlockGZIPFileWriter implements Closeable {
private GZIPOutputStream gzipStream;
private CountingOutputStream fileStream;
private final ObjectMapper objectMapper = new ObjectMapper();
private final Map<String, String> tags;

public Map<String, String> tags() {
return tags;
}

private class Chunk {
public long rawBytes = 0;
Expand Down Expand Up @@ -116,16 +109,15 @@ public BlockGZIPFileWriter(String filenameBase, String path, long firstRecordOff
}

public BlockGZIPFileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold) throws IOException {
this(filenameBase, path, firstRecordOffset, chunkThreshold, new byte[0], new HashMap<>());
this(filenameBase, path, firstRecordOffset, chunkThreshold, new byte[0]);
}

public BlockGZIPFileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold, byte[] header, Map<String, String> tags)
public BlockGZIPFileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold, byte[] header)
throws IOException {
this.filenameBase = filenameBase;
this.path = path;
this.firstRecordOffset = firstRecordOffset;
this.chunkThreshold = chunkThreshold;
this.tags = tags;

chunks = new ArrayList<>();

Expand Down Expand Up @@ -183,7 +175,12 @@ public String getIndexFilePath() {
}


public void write(List<byte[]> toWrite) throws IOException {
/**
*
* @param toWrite the bytes to write.
* @param recordCount how many records these bytes represent.
*/
public void write(List<byte[]> toWrite, long recordCount) throws IOException {
Chunk ch = currentChunk();

int rawBytesToWrite = 0;
Expand All @@ -207,7 +204,7 @@ public void write(List<byte[]> toWrite) throws IOException {
}

ch.rawBytes += rawBytesToWrite;
ch.numRecords += toWrite.size();
ch.numRecords += recordCount;
}

public void delete() {
Expand Down Expand Up @@ -245,7 +242,7 @@ public void close() throws IOException {
private void writeIndex() throws IOException {
File indexFile = new File(getIndexFilePath());
if (!indexFile.getParentFile().exists() && !indexFile.getParentFile().mkdirs()) {
throw new IllegalArgumentException("Cannot create index " + indexFile);
throw new IOException("Cannot create index " + indexFile);
}

objectMapper.writer().writeValue(indexFile, ChunksIndex.of(chunks.stream()
Expand Down
209 changes: 95 additions & 114 deletions sink/src/main/java/com/spredfast/kafka/connect/s3/sink/S3SinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;

Expand Down Expand Up @@ -38,8 +39,7 @@ public class S3SinkTask extends SinkTask {

private Map<String, String> config;

private final Map<TopicPartition, BlockGZIPFileWriter> tmpFiles = new HashMap<>();
private final Map<TopicPartition, S3RecordsWriter> writers = new HashMap<>();
private final Map<TopicPartition, PartitionWriter> partitions = new LinkedHashMap<>();

private long GZIPChunkThreshold = 67108864;

Expand Down Expand Up @@ -95,156 +95,137 @@ private Optional<String> configGet(String key) {

@Override
public void stop() throws ConnectException {
// delete our temp files
for (BlockGZIPFileWriter writer : tmpFiles.values()) {
// ensure we delete our temp files
for (PartitionWriter writer : partitions.values()) {
log.debug("{} Stopping - Deleting temp file {}", name(), writer.getDataFilePath());
writer.delete();
}
}


private void writeAll(Collection<SinkRecord> records, BlockGZIPFileWriter buffer, S3RecordsWriter writer) {
metrics.hist(records.size(), "putSize", buffer.tags());
try (Metrics.StopTimer ignored = metrics.time("writeAll", buffer.tags())) {
buffer.write(writer.writeBatch(records.stream().map(record -> new ProducerRecord<>(record.topic(), record.kafkaPartition(),
keyConverter.map(c -> c.fromConnectData(record.topic(), record.keySchema(), record.key()))
.orElse(null),
valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value())
))).collect(toList()));
} catch (IOException e) {
throw new RetriableException("Failed to write to buffer", e);
}
}

@Override
public void put(Collection<SinkRecord> records) throws ConnectException {
records.stream().collect(groupingBy(record -> new TopicPartition(record.topic(), record.kafkaPartition()))).forEach((tp, rs) -> {
BlockGZIPFileWriter buffer = tmpFiles.get(tp);
if (buffer == null) {
log.error("Trying to put {} records to partition {} which doesn't exist yet", records.size(), tp);
throw new ConnectException("Trying to put records for a topic partition that has not be assigned");
}
long firstOffset = rs.get(0).kafkaOffset();
long lastOffset = rs.get(rs.size() - 1).kafkaOffset();

log.debug("{} received {} records for {} to archive. Last offset {}", name(), rs.size(), tp,
rs.get(rs.size() - 1).kafkaOffset());
writerGet(tp).ifPresent(w -> writeAll(rs, buffer, w));
lastOffset);

PartitionWriter writer = partitions.computeIfAbsent(tp,
t -> initWriter(t, firstOffset));
writer.writeAll(rs);
});
}

@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) throws ConnectException {
Metrics.StopTimer timer = metrics.time("flush", tags);
// Don't rely on offsets passed. They have some quirks like including topic partitions that just
// got revoked (i.e. we have deleted the writer already). Not sure if this is intended...
// https://twitter.com/mr_paul_banks/status/702493772983177218
log.debug("{} flushing offsets", name());

// Instead iterate over the writers we do have and get the offsets directly from them.
for (Map.Entry<TopicPartition, BlockGZIPFileWriter> entry : tmpFiles.entrySet()) {
TopicPartition tp = entry.getKey();
BlockGZIPFileWriter writer = entry.getValue();
if (writer.getNumRecords() == 0) {
continue;
}
try {
Metrics.StopTimer time = metrics.time("s3Put", writer.tags());

finishWriter(writer, tp);
long nextOffset = s3.putChunk(writer.getDataFilePath(), writer.getIndexFilePath(), tp);

log.debug("{} Finished {} to {} - Deleting temp file {}", name(), tp, nextOffset, writer.getDataFilePath());
writer.delete();

time.stop();
log.debug("{} flushing offsets", name());

// Now reset writer to a new one
initWriter(tp, nextOffset);
// XXX the docs for flush say that the offsets given are the same as if we tracked the offsets
// of the records given to put, so we should just write whatever we have in our files
offsets.keySet().stream()
.map(partitions::get)
.filter(p -> p != null) // TODO error/warn?
.forEach(PartitionWriter::done);

log.debug("{} successfully uploaded {} records for {} as {} now at offset {}", name(), writer.getNumRecords(), tp,
writer.getDataFileName(), nextOffset);
} catch (FileNotFoundException fnf) {
throw new ConnectException("Failed to find local dir for temp files", fnf);
} catch (IOException e) {
throw new RetriableException("Failed S3 upload", e);
}
}
timer.stop();
}

private String name() {
return configGet("name").orElseThrow(() -> new IllegalWorkerStateException("Tasks always have names"));
}

public void finishWriter(BlockGZIPFileWriter writer, TopicPartition tp) throws IOException {
writerGet(tp).ifPresent(w -> w.finish(tp.topic(), tp.partition()));
writers.remove(tp);
writer.close();
@Override
public void close(Collection<TopicPartition> partitions) {
// have already flushed, so just ensure the temp files are deleted (in case flush threw an exception)
partitions.stream()
.map(this.partitions::get)
.filter(p -> p != null)
.forEach(PartitionWriter::delete);
}

private Optional<S3RecordsWriter> writerGet(TopicPartition tp) {
return ofNullable(writers.get(tp));
@Override
public void open(Collection<TopicPartition> partitions) {
// nothing to do. we will create files when we are given the first record for a partition
// offsets are managed by Connect
}

private BlockGZIPFileWriter createNextBlockWriter(TopicPartition tp, long nextOffset, S3RecordsWriter newWriter) throws ConnectException, IOException {
String name = String.format("%s-%05d", tp.topic(), tp.partition());
String path = configGet("local.buffer.dir")
.orElseThrow(() -> new ConnectException("No local buffer file path configured"));
log.debug("New temp file: {}/{} @ {}", path, name, nextOffset);
Map<String, String> tags = new HashMap<>(this.tags);
tags.put("kafka_topic", tp.topic());
tags.put("kafka_partition", "" + tp.partition());
return new BlockGZIPFileWriter(name, path, nextOffset, this.GZIPChunkThreshold, newWriter.init(tp.topic(), tp.partition(), nextOffset), tags);
private PartitionWriter initWriter(TopicPartition tp, long offset) {
try {
return new PartitionWriter(tp, offset);
} catch (IOException e) {
throw new RetriableException("Error initializing writer for " + tp + " at offset " + offset, e);
}
}

@Override
public void close(Collection<TopicPartition> partitions) {
for (TopicPartition tp : partitions) {
// See if this is a new assignment
BlockGZIPFileWriter writer = this.tmpFiles.remove(tp);
if (writer != null) {
log.info("Revoked partition {} Deleting temp file {}", tp, writer.getDataFileName());
try {
finishWriter(writer, tp);
writer.delete();
} catch (IOException ioe) {
throw new ConnectException("Failed to resume TopicPartition form S3", ioe);
}
}
private class PartitionWriter {
private final TopicPartition tp;
private final BlockGZIPFileWriter writer;
private final S3RecordsWriter format;
private final Map<String, String> tags;
private boolean finished;
private boolean closed;

private PartitionWriter(TopicPartition tp, long firstOffset) throws IOException {
this.tp = tp;
format = recordFormat.newWriter();

String name = String.format("%s-%05d", tp.topic(), tp.partition());
String path = configGet("local.buffer.dir")
.orElseThrow(() -> new ConnectException("No local buffer file path configured"));
log.debug("New temp file: {}/{} @ {}", path, name, firstOffset);

Map<String, String> writerTags = new HashMap<>(S3SinkTask.this.tags);
writerTags.put("kafka_topic", tp.topic());
writerTags.put("kafka_partition", "" + tp.partition());
this.tags = writerTags;

writer = new BlockGZIPFileWriter(name, path, firstOffset, GZIPChunkThreshold, format.init(tp.topic(), tp.partition(), firstOffset));
}
}

@Override
public void open(Collection<TopicPartition> partitions) {
for (TopicPartition tp : partitions) {
// See if this is a new assignment
if (this.tmpFiles.get(tp) == null) {
log.info("Assigned new partition {} creating buffer writer", tp);
try {
recoverPartition(tp);
} catch (IOException ioe) {
throw new ConnectException("Failed to resume TopicPartition from S3", ioe);
}
private void writeAll(Collection<SinkRecord> records) {
metrics.hist(records.size(), "putSize", tags);
try (Metrics.StopTimer ignored = metrics.time("writeAll", tags)) {
writer.write(format.writeBatch(records.stream().map(record -> new ProducerRecord<>(record.topic(), record.kafkaPartition(),
keyConverter.map(c -> c.fromConnectData(record.topic(), record.keySchema(), record.key()))
.orElse(null),
valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value())
))).collect(toList()), records.size());
} catch (IOException e) {
throw new RetriableException("Failed to write to buffer", e);
}
}
}

private void recoverPartition(TopicPartition tp) throws IOException {
this.context.pause(tp);

// Recover last committed offset from S3
long offset = s3.fetchOffset(tp);
public String getDataFilePath() {
return writer.getDataFilePath();
}

log.info("Recovering partition {} from offset {}", tp, offset);
public void delete() {
writer.delete();
partitions.remove(tp);
}

initWriter(tp, offset);
public void done() {
Metrics.StopTimer time = metrics.time("s3Put", tags);
try {
if (!finished) {
writer.write(Arrays.asList(format.finish(tp.topic(), tp.partition())), 0);
finished = true;
}
if (!closed) {
writer.close();
closed = true;
}
s3.putChunk(writer.getDataFilePath(), writer.getIndexFilePath(), tp);
} catch (IOException e) {
throw new RetriableException("Error flushing " + tp, e);
}

this.context.offset(tp, offset);
this.context.resume(tp);
}
delete();
time.stop();
}

private void initWriter(TopicPartition tp, long offset) throws IOException {
S3RecordsWriter newWriter = recordFormat.newWriter();
tmpFiles.put(tp, createNextBlockWriter(tp, offset, newWriter));
writers.put(tp, newWriter);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void testWrite() throws Exception {
totalUncompressedBytes += line.length() + 1;
// Expect to read without newlines...
expectedLines[i] = line;
w.write(toRecord(line));
w.write(toRecord(line), 1);
}

assertEquals(50, w.getNumRecords());
Expand Down Expand Up @@ -173,7 +173,7 @@ public void testShouldOverwrite() throws Exception {
String[] expectedLines = new String[5000];
for (int i = 0; i < 5000; i++) {
String line = String.format("Record %d", i);
w.write(toRecord(line));
w.write(toRecord(line), 1);
expectedLines[i] = line;
}

Expand All @@ -195,7 +195,7 @@ public void testShouldOverwrite() throws Exception {
String[] expectedLines2 = new String[10];
for (int i = 0; i < 10; i++) {
String line = String.format("Overwrite record %d", i);
w.write(toRecord(line));
w.write(toRecord(line), 1);
expectedLines2[i] = line;
}

Expand All @@ -217,7 +217,7 @@ public void testDelete() throws Exception {
String[] expectedLines = new String[5000];
for (int i = 0; i < 5000; i++) {
String line = String.format("Record %d", i);
w.write(toRecord(line));
w.write(toRecord(line), 1);
expectedLines[i] = line;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void setUp() throws Exception {
private BlockGZIPFileWriter createDummmyFiles(long offset, int numRecords) throws Exception {
BlockGZIPFileWriter writer = new BlockGZIPFileWriter("bar-00000", tmpDir, offset);
for (int i = 0; i < numRecords; i++) {
writer.write(Arrays.asList(String.format("Record %d", i).getBytes()));
writer.write(Arrays.asList(String.format("Record %d", i).getBytes()), 1);
}
writer.close();
return writer;
Expand Down
Loading

0 comments on commit 8aedb8c

Please sign in to comment.