forked from jqnatividad/qsv
-
Notifications
You must be signed in to change notification settings - Fork 0
/
snappy.rs
155 lines (128 loc) · 4.99 KB
/
snappy.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
static USAGE: &str = r#"
Does streaming compression/decompression of the input using the Snappy format.
https://google.github.io/snappy/
It has three subcommands:
compress: Compress the input (multi-threaded).
decompress: Decompress the input.
check: Check if the input is a valid Snappy file. Returns exitcode 0 if valid,
exitcode 1 otherwise.
Note that most qsv commands will automatically decompress Snappy files if the
input file has an ".sz" extension. It will also automatically compress the output
file (though only single-threaded) if the --output file has an ".sz" extension.
This command's multi-threaded compression is 4-5x faster than qsv's automatic
single-threaded compression. However, the number of --jobs is capped at 8 threads
as benchmarks indicate diminishing returns beyond that.
https://raw.githubusercontent.com/sstadick/gzp/main/violin.svg
Also, this command is not specific to CSV data, it can compress/decompress any file.
For examples, see https://github.com/jqnatividad/qsv/blob/master/tests/test_snappy.rs.
Usage:
qsv snappy compress [options] [<input>]
qsv snappy decompress [options] [<input>]
qsv snappy check [<input>]
qsv snappy --help
snappy arguments:
<input> The input file to compress/decompress. If not specified, stdin is used.
options:
-h, --help Display this message
-o, --output <file> Write output to <output> instead of stdout.
-j, --jobs <arg> The number of jobs to run in parallel when compressing.
When not set, the number of jobs is set to 8 or the number
of CPUs detected, whichever is smaller.
"#;
use std::{
env, fs,
io::{self, stdin, BufRead, Read, Write},
};
use gzp::{par::compress::ParCompressBuilder, snap::Snap};
use serde::Deserialize;
use snap;
use crate::{config, util, CliError, CliResult};
#[derive(Deserialize)]
struct Args {
arg_input: Option<String>,
flag_output: Option<String>,
cmd_compress: bool,
cmd_decompress: bool,
cmd_check: bool,
flag_jobs: Option<usize>,
}
impl From<snap::Error> for CliError {
fn from(err: snap::Error) -> CliError {
CliError::Other(format!("Snap error: {err:?}"))
}
}
pub fn run(argv: &[&str]) -> CliResult<()> {
let args: Args = util::get_args(USAGE, argv)?;
let input_reader: Box<dyn BufRead> = match &args.arg_input {
Some(input_path) => {
let file = fs::File::open(input_path)?;
Box::new(io::BufReader::with_capacity(
config::DEFAULT_RDR_BUFFER_CAPACITY,
file,
))
}
None => Box::new(io::BufReader::new(stdin().lock())),
};
let output_writer: Box<dyn Write + Send + 'static> = match &args.flag_output {
Some(output_path) => Box::new(io::BufWriter::with_capacity(
config::DEFAULT_WTR_BUFFER_CAPACITY,
fs::File::create(output_path)?,
)),
None => Box::new(io::BufWriter::with_capacity(
config::DEFAULT_WTR_BUFFER_CAPACITY,
io::stdout(),
)),
};
if args.cmd_compress {
let mut jobs = util::njobs(args.flag_jobs);
if jobs > 8 {
jobs = 8;
}
compress(input_reader, output_writer, jobs)?;
} else if args.cmd_decompress {
decompress(input_reader, output_writer)?;
} else if args.cmd_check {
if check(input_reader) {
eprintln!("Snappy file.");
} else {
return fail_clierror!("Not a snappy file.");
}
}
Ok(())
}
// multi-threaded streaming snappy compression
fn compress<R: Read, W: Write + Send + 'static>(mut src: R, dst: W, jobs: usize) -> CliResult<()> {
let rdr_capacitys = env::var("QSV_RDR_BUFFER_CAPACITY")
.unwrap_or_else(|_| config::DEFAULT_RDR_BUFFER_CAPACITY.to_string());
let mut buffer_size: usize = rdr_capacitys
.parse()
.unwrap_or(config::DEFAULT_RDR_BUFFER_CAPACITY);
// the buffer size must be at least 32768 bytes, otherwise, ParCompressBuilder panics
// as it expects the buffer size to be greater than its DICT_SIZE which is 32768
if buffer_size < 32768 {
buffer_size = 32768
};
let mut writer = ParCompressBuilder::<Snap>::new()
.num_threads(jobs)
.unwrap()
.buffer_size(buffer_size)
.unwrap()
.pin_threads(Some(0))
.from_writer(dst);
io::copy(&mut src, &mut writer)?;
Ok(())
}
// streaming snappy decompression
fn decompress<R: Read, W: Write>(src: R, mut dst: W) -> CliResult<()> {
let mut src = snap::read::FrameDecoder::new(src);
io::copy(&mut src, &mut dst)?;
Ok(())
}
// check if a file is a valid snappy file
fn check<R: Read>(src: R) -> bool {
let src = snap::read::FrameDecoder::new(src);
// read the first 50 or less bytes of a file
// the snap decoder will return an error if the file is not a valid snappy file
let mut buffer = Vec::with_capacity(51);
src.take(50).read_to_end(&mut buffer).is_ok()
}