Skip to content

Commit

Permalink
feat: Protobuf serde for Json file sink (#8062)
Browse files Browse the repository at this point in the history
* Protobuf serde for Json file sink

* Fix tests

* Fix test
  • Loading branch information
Jefffrey authored Nov 8, 2023
1 parent 21b2af1 commit 965b318
Show file tree
Hide file tree
Showing 13 changed files with 1,703 additions and 208 deletions.
10 changes: 8 additions & 2 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl BatchSerializer for JsonSerializer {
}

/// Implements [`DataSink`] for writing to a Json file.
struct JsonSink {
pub struct JsonSink {
/// Config options for writing data
config: FileSinkConfig,
}
Expand Down Expand Up @@ -258,10 +258,16 @@ impl DisplayAs for JsonSink {
}

impl JsonSink {
fn new(config: FileSinkConfig) -> Self {
/// Create from config.
pub fn new(config: FileSinkConfig) -> Self {
Self { config }
}

/// Retrieve the inner [`FileSinkConfig`].
pub fn config(&self) -> &FileSinkConfig {
&self.config
}

async fn append_all(
&self,
data: SendableRecordBatchStream,
Expand Down
12 changes: 11 additions & 1 deletion datafusion/physical-plan/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,21 @@ impl FileSinkExec {
}
}

/// Input execution plan
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
}

/// Returns insert sink
pub fn sink(&self) -> &dyn DataSink {
self.sink.as_ref()
}

/// Optional sort order for output data
pub fn sort_order(&self) -> &Option<Vec<PhysicalSortRequirement>> {
&self.sort_order
}

/// Returns the metrics of the underlying [DataSink]
pub fn metrics(&self) -> Option<MetricsSet> {
self.sink.metrics()
Expand All @@ -170,7 +180,7 @@ impl DisplayAs for FileSinkExec {
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "InsertExec: sink=")?;
write!(f, "FileSinkExec: sink=")?;
self.sink.fmt_as(t, f)
}
}
Expand Down
54 changes: 54 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1130,9 +1130,63 @@ message PhysicalPlanNode {
SortPreservingMergeExecNode sort_preserving_merge = 21;
NestedLoopJoinExecNode nested_loop_join = 22;
AnalyzeExecNode analyze = 23;
JsonSinkExecNode json_sink = 24;
}
}

enum FileWriterMode {
APPEND = 0;
PUT = 1;
PUT_MULTIPART = 2;
}

enum CompressionTypeVariant {
GZIP = 0;
BZIP2 = 1;
XZ = 2;
ZSTD = 3;
UNCOMPRESSED = 4;
}

message PartitionColumn {
string name = 1;
ArrowType arrow_type = 2;
}

message FileTypeWriterOptions {
oneof FileType {
JsonWriterOptions json_options = 1;
}
}

message JsonWriterOptions {
CompressionTypeVariant compression = 1;
}

message FileSinkConfig {
string object_store_url = 1;
repeated PartitionedFile file_groups = 2;
repeated string table_paths = 3;
Schema output_schema = 4;
repeated PartitionColumn table_partition_cols = 5;
FileWriterMode writer_mode = 6;
bool single_file_output = 7;
bool unbounded_input = 8;
bool overwrite = 9;
FileTypeWriterOptions file_type_writer_options = 10;
}

message JsonSink {
FileSinkConfig config = 1;
}

message JsonSinkExecNode {
PhysicalPlanNode input = 1;
JsonSink sink = 2;
Schema sink_schema = 3;
PhysicalSortExprNodeCollection sort_order = 4;
}

message PhysicalExtensionNode {
bytes node = 1;
repeated PhysicalPlanNode inputs = 2;
Expand Down
Loading

0 comments on commit 965b318

Please sign in to comment.