From 5bab5e29e0f9c1f9036c0191eebc8481b2d3da9c Mon Sep 17 00:00:00 2001 From: Ruslan Dautkhanov Date: Mon, 30 Jul 2018 17:03:33 -0600 Subject: [PATCH] 2.2.8: fixing header compression --- abalon/spark/sparkutils.py | 23 +++++++++++++++++------ abalon/version.py | 2 +- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/abalon/spark/sparkutils.py b/abalon/spark/sparkutils.py index 22429bb..6f05693 100644 --- a/abalon/spark/sparkutils.py +++ b/abalon/spark/sparkutils.py @@ -393,7 +393,7 @@ def debug_print (message): raise -def HDFSwriteString (dst_file, content, overwrite=True, appendEOL=True): +def HDFSwriteString (dst_file, content, overwrite=True, appendEOL=True, compression='none'): """ Creates an HDFS file with given content. @@ -403,15 +403,23 @@ def HDFSwriteString (dst_file, content, overwrite=True, appendEOL=True): :param content: string to be written to the file :param overwrite: overwrite target file? :param appendEOL: append new line character? + :param compression: none, bzip2 or gzip """ sparkutils_init() - out_stream = fs.create(hadoop.fs.Path(dst_file), overwrite) - if appendEOL: content += "\n" + if compression=='gzip': + import zlib + content = zlib.compress(content) + elif compression=='bzip2': + import bz2 + content = bz2.compress(content) + + out_stream = fs.create(hadoop.fs.Path(dst_file), overwrite) + try: out_stream.write(bytearray(content)) finally: @@ -442,8 +450,8 @@ def dataframeToHDFSfile (dataframe, dst_file, overwrite=False :param escape: character - by default the escape character is \, but can be set to any character. Escaped quote characters are ignored :param quoteMode: https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/QuoteMode.html - :param compression: compression codec to use when saving to file. This can be one of the known case-insensitive - shorten names (none, bzip2, gzip, lz4, snappy and deflate). + :param compression: compression codec to use when saving to file. This can be one of the known + shorten names (none, bzip2, gzip). lz4, snappy and deflate only supported with header=False """ @@ -454,6 +462,9 @@ def dataframeToHDFSfile (dataframe, dst_file, overwrite=False dst_dir = dst_file + '.tmpdir' + if header and compression not in ['none', 'bzip2', 'gzip']: + raise ValueError("Header compression only supports 'gzip' and 'bzip2'") + (dataframe .write .option('header', False) # always save without header as if there are multiple partitions, @@ -472,7 +483,7 @@ def dataframeToHDFSfile (dataframe, dst_file, overwrite=False header_record = delimiter.join(dataframe.columns) header_filename = "{}/--00_header.csv".format(dst_dir) # have to make sure header filename is 1st in # alphabetical order - HDFSwriteString(header_filename, header_record) + HDFSwriteString(header_filename, header_record, compression) HDFScopyMerge(dst_dir, dst_file, overwrite, deleteSource=True) diff --git a/abalon/version.py b/abalon/version.py index 28d779f..ee0c89e 100644 --- a/abalon/version.py +++ b/abalon/version.py @@ -12,6 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -version = '2.2.7' +version = '2.2.8'