Skip to content

Commit

Permalink
DRILL-7828: Refactor Pcap and Pcapng format plugin (apache#2192)
Browse files Browse the repository at this point in the history
  • Loading branch information
vdiravka committed Apr 26, 2021
1 parent b5e01be commit 360b080
Show file tree
Hide file tree
Showing 55 changed files with 533 additions and 442 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.drill.exec.store.pcap.decoder.PacketDecoder;
import org.apache.drill.exec.store.pcap.decoder.TcpSession;
import org.apache.drill.exec.store.pcap.schema.Schema;
import org.apache.drill.exec.store.pcap.plugin.PcapFormatConfig;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.hadoop.mapred.FileSplit;
import org.slf4j.Logger;
Expand All @@ -48,126 +49,61 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
private static final Logger logger = LoggerFactory.getLogger(PcapBatchReader.class);

private FileSplit split;

private PacketDecoder decoder;

private InputStream fsStream;

private RowSetLoader rowWriter;

private int validBytes;

private byte[] buffer;

private int offset;

private ScalarWriter typeWriter;

private ScalarWriter timestampWriter;

private ScalarWriter timestampMicroWriter;

private ScalarWriter networkWriter;

private ScalarWriter srcMacAddressWriter;

private ScalarWriter dstMacAddressWriter;

private ScalarWriter dstIPWriter;

private ScalarWriter srcIPWriter;

private ScalarWriter srcPortWriter;

private ScalarWriter dstPortWriter;

private ScalarWriter packetLengthWriter;

private ScalarWriter tcpSessionWriter;

private ScalarWriter tcpSequenceWriter;

private ScalarWriter tcpAckNumberWriter;

private ScalarWriter tcpFlagsWriter;

private ScalarWriter tcpParsedFlagsWriter;

private ScalarWriter tcpNsWriter;

private ScalarWriter tcpCwrWriter;

private ScalarWriter tcpEceWriter;

private ScalarWriter tcpFlagsEceEcnCapableWriter;

private ScalarWriter tcpFlagsCongestionWriter;

private ScalarWriter tcpUrgWriter;

private ScalarWriter tcpAckWriter;

private ScalarWriter tcpPshWriter;

private ScalarWriter tcpRstWriter;

private ScalarWriter tcpSynWriter;

private ScalarWriter tcpFinWriter;

private ScalarWriter dataWriter;

private ScalarWriter isCorruptWriter;

private final PcapReaderConfig readerConfig;


private final PcapFormatConfig readerConfig;
// Writers for TCP Sessions
private ScalarWriter sessionStartTimeWriter;

private ScalarWriter sessionEndTimeWriter;

private ScalarWriter sessionDurationWriter;

private ScalarWriter connectionTimeWriter;

private ScalarWriter packetCountWriter;

private ScalarWriter originPacketCounterWriter;

private ScalarWriter remotePacketCounterWriter;

private ScalarWriter originDataVolumeWriter;

private ScalarWriter remoteDataVolumeWriter;

private ScalarWriter hostDataWriter;

private ScalarWriter remoteDataWriter;

private final int maxRecords;

private Map<Long, TcpSession> sessionQueue;


public static class PcapReaderConfig {

protected final PcapFormatPlugin plugin;

public boolean sessionizeTCPStreams;

private final PcapFormatConfig config;

public PcapReaderConfig(PcapFormatPlugin plugin) {
this.plugin = plugin;
this.config = plugin.getConfig();
this.sessionizeTCPStreams = config.getSessionizeTCPStreams();
}
}

public PcapBatchReader(PcapReaderConfig readerConfig, int maxRecords) {
public PcapBatchReader(PcapFormatConfig readerConfig, int maxRecords) {
this.readerConfig = readerConfig;
if (readerConfig.sessionizeTCPStreams) {
if (readerConfig.getSessionizeTCPStreams()) {
sessionQueue = new HashMap<>();
}
this.maxRecords = maxRecords;
Expand All @@ -178,7 +114,7 @@ public boolean open(FileSchemaNegotiator negotiator) {
split = negotiator.split();
openFile(negotiator);
SchemaBuilder builder = new SchemaBuilder();
Schema pcapSchema = new Schema(readerConfig.sessionizeTCPStreams);
Schema pcapSchema = new Schema(readerConfig.getSessionizeTCPStreams());
TupleMetadata schema = pcapSchema.buildSchema(builder);
negotiator.tableSchema(schema, false);
ResultSetLoader loader = negotiator.build();
Expand Down Expand Up @@ -238,7 +174,7 @@ private void openFile(FileSchemaNegotiator negotiator) {
}

private void populateColumnWriters(RowSetLoader rowWriter) {
if (readerConfig.sessionizeTCPStreams) {
if (readerConfig.getSessionizeTCPStreams()) {
srcMacAddressWriter = rowWriter.scalar("src_mac_address");
dstMacAddressWriter = rowWriter.scalar("dst_mac_address");
dstIPWriter = rowWriter.scalar("dst_ip");
Expand Down Expand Up @@ -323,7 +259,7 @@ private boolean parseNextPacket(RowSetLoader rowWriter) {
}

// If we are resessionizing the TCP Stream, add the packet to the stream
if (readerConfig.sessionizeTCPStreams) {
if (readerConfig.getSessionizeTCPStreams()) {
// If the session has not been seen before, add it to the queue
long sessionID = packet.getSessionHash();
if (!sessionQueue.containsKey(sessionID)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static int getIntFileOrder(boolean byteOrder, final byte[] buf, final int

/**
*
* @param byteOrder true for forward file order, false fore revers file order
* @param byteOrder true for forward file order, false for reverse file order
* @param buf byte buffer
* @param offset buffer offset
* @return short value as int of specific bytes from buffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,15 @@ public class PacketDecoder {
private static final int GLOBAL_HEADER_SIZE = 24;
private static final int PCAP_MAGIC_LITTLE_ENDIAN = 0xD4C3B2A1;
private static final int PCAP_MAGIC_NUMBER = 0xA1B2C3D4;
private static final int PCAPNG_MAGIC_LITTLE_ENDIAN = 0x4D3C2B1A;
private static final int PCAPNG_MAGIC_NUMBER = 0x0A0D0D0A;

private static final Logger logger = LoggerFactory.getLogger(PacketDecoder.class);

private final int maxLength;
private final int network;
private boolean bigEndian;
private FileFormat fileFormat;

private InputStream input;

Expand All @@ -78,16 +81,28 @@ public PacketDecoder(final InputStream input) throws IOException {
switch (getInt(globalHeader, 0)) {
case PCAP_MAGIC_NUMBER:
bigEndian = true;
fileFormat = FileFormat.PCAP;
break;
case PCAP_MAGIC_LITTLE_ENDIAN:
bigEndian = false;
fileFormat = FileFormat.PCAP;
break;
case PCAPNG_MAGIC_NUMBER:
bigEndian = true;
fileFormat = FileFormat.PCAPNG;
break;
case PCAPNG_MAGIC_LITTLE_ENDIAN:
bigEndian = false;
fileFormat = FileFormat.PCAPNG;
break;
default:
//noinspection ConstantConditions
Preconditions.checkState(false,
String.format("Bad magic number = %08x", getIntFileOrder(bigEndian, globalHeader, 0)));
}
Preconditions.checkState(getShortFileOrder(bigEndian, globalHeader, 4) == 2, "Wanted major version == 2");
if(fileFormat == FileFormat.PCAP) {
Preconditions.checkState(getShortFileOrder(bigEndian, globalHeader, 4) == 2, "Wanted major version == 2");
} // todo: pcapng major version == 1 precondition
maxLength = getIntFileOrder(bigEndian, globalHeader, 16);
network = getIntFileOrder(bigEndian, globalHeader, 20);
}
Expand Down Expand Up @@ -116,6 +131,10 @@ public boolean isBigEndian() {
return bigEndian;
}

public FileFormat getFileFormat() {
return fileFormat;
}

public Packet nextPacket() throws IOException {
Packet r = new Packet();
if (r.readPcap(input, bigEndian, maxLength)) {
Expand All @@ -124,4 +143,10 @@ public Packet nextPacket() throws IOException {
return null;
}
}

public enum FileFormat {
PCAP,
PCAPNG,
UNKNOWN
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.pcap.plugin;

import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
import org.apache.drill.exec.store.pcap.PcapBatchReader;
import org.apache.drill.exec.store.pcap.decoder.PacketDecoder;
import org.apache.drill.exec.store.pcapng.PcapngBatchReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;

public abstract class BasePcapFormatPlugin<T extends PcapFormatConfig> extends EasyFormatPlugin<T> {

static final Logger logger = LoggerFactory.getLogger(ManagedScanFramework.class);
private static PacketDecoder.FileFormat fileFormat = PacketDecoder.FileFormat.UNKNOWN;

public BasePcapFormatPlugin(String name,
DrillbitContext context,
Configuration fsConf,
StoragePluginConfig storageConfig,
T formatConfig) {
super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
}

private static EasyFormatConfig easyConfig(Configuration fsConf, PcapFormatConfig pluginConfig) {
return EasyFormatConfig.builder()
.readable(true)
.writable(false)
.blockSplittable(false)
.compressible(true)
.extensions(pluginConfig.getExtensions())
.fsConf(fsConf)
.useEnhancedScan(true)
.supportsLimitPushdown(true)
.supportsProjectPushdown(true)
.defaultName(PcapFormatConfig.NAME)
.build();
}

private static class PcapReaderFactory extends FileReaderFactory {

private final PcapFormatConfig config;
private final EasySubScan scan;

public PcapReaderFactory(PcapFormatConfig config, EasySubScan scan) {
this.config = config;
this.scan = scan;
}

/**
* Reader creator. If file format can't be detected try to use default PCAP format plugin
*
* @return PCAP or PCAPNG batch reader
*/
@Override
public ManagedReader<? extends FileSchemaNegotiator> newReader() {
if (fileFramework().isPresent()) { // todo: can be simplified with java9 ifPresentOrElse
Path path = scan.getWorkUnits().stream()
.findFirst()
.orElseThrow(() -> UserException.
dataReadError()
.addContext("There are no files for scanning")
.build(logger))
.getPath();
fileFormat = getFileFormat(fileFramework().get().fileSystem(), path);
if (config.getExtensions().stream()
.noneMatch(f -> f.equals(fileFormat.name().toLowerCase()))) {
logger.error("File format {} is not within plugin extensions: {}. Trying to use default PCAP format plugin to " +
"read the file", fileFormat, config.getExtensions());
}
} else {
logger.error("It is not possible to detect file format, because the File Framework is not initialized. " +
"Trying to use default PCAP format plugin to read the file");
}
return createReader(scan, config);
}
}

@Override
public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options) {
return createReader(scan, formatConfig);
}

private static ManagedReader<? extends FileSchemaNegotiator> createReader(EasySubScan scan, PcapFormatConfig config) {
switch(fileFormat) {
case PCAPNG: return new PcapngBatchReader(config, scan);
case PCAP:
case UNKNOWN:
default: return new PcapBatchReader(config, scan.getMaxRecords());
}
}

@Override
protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
FileScanBuilder builder = new FileScanBuilder();
builder.setReaderFactory(new PcapReaderFactory(formatConfig, scan));

initScanBuilder(builder, scan);
builder.nullType(Types.optional(MinorType.VARCHAR));
return builder;
}

/**
* Helper method to detect PCAP or PCAPNG file format based on file Magic Number
*
* @param dfs for obtaining InputStream
* @return PCAP/PCAPNG file format
*/
private static PacketDecoder.FileFormat getFileFormat(DrillFileSystem dfs, Path path) {
try (InputStream inputStream = dfs.openPossiblyCompressedStream(path)) {
PacketDecoder decoder = new PacketDecoder(inputStream);
return decoder.getFileFormat();
} catch (IOException io) {
throw UserException
.dataReadError(io)
.addContext("File name:", path.toString())
.build(logger);
}
}
}
Loading

0 comments on commit 360b080

Please sign in to comment.