Skip to content

Commit

Permalink
2.2.8: fixing header compression
Browse files Browse the repository at this point in the history
  • Loading branch information
Tagar committed Jul 30, 2018
1 parent fd5b4f0 commit 5bab5e2
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 7 deletions.
23 changes: 17 additions & 6 deletions abalon/spark/sparkutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ def debug_print (message):
raise


def HDFSwriteString (dst_file, content, overwrite=True, appendEOL=True):
def HDFSwriteString (dst_file, content, overwrite=True, appendEOL=True, compression='none'):

"""
Creates an HDFS file with given content.
Expand All @@ -403,15 +403,23 @@ def HDFSwriteString (dst_file, content, overwrite=True, appendEOL=True):
:param content: string to be written to the file
:param overwrite: overwrite target file?
:param appendEOL: append new line character?
:param compression: none, bzip2 or gzip
"""

sparkutils_init()

out_stream = fs.create(hadoop.fs.Path(dst_file), overwrite)

if appendEOL:
content += "\n"

if compression=='gzip':
import zlib
content = zlib.compress(content)
elif compression=='bzip2':
import bz2
content = bz2.compress(content)

out_stream = fs.create(hadoop.fs.Path(dst_file), overwrite)

try:
out_stream.write(bytearray(content))
finally:
Expand Down Expand Up @@ -442,8 +450,8 @@ def dataframeToHDFSfile (dataframe, dst_file, overwrite=False
:param escape: character - by default the escape character is \, but can be set to any character.
Escaped quote characters are ignored
:param quoteMode: https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/QuoteMode.html
:param compression: compression codec to use when saving to file. This can be one of the known case-insensitive
shorten names (none, bzip2, gzip, lz4, snappy and deflate).
:param compression: compression codec to use when saving to file. This can be one of the known
shorten names (none, bzip2, gzip). lz4, snappy and deflate only supported with header=False
"""

Expand All @@ -454,6 +462,9 @@ def dataframeToHDFSfile (dataframe, dst_file, overwrite=False

dst_dir = dst_file + '.tmpdir'

if header and compression not in ['none', 'bzip2', 'gzip']:
raise ValueError("Header compression only supports 'gzip' and 'bzip2'")

(dataframe
.write
.option('header', False) # always save without header as if there are multiple partitions,
Expand All @@ -472,7 +483,7 @@ def dataframeToHDFSfile (dataframe, dst_file, overwrite=False
header_record = delimiter.join(dataframe.columns)
header_filename = "{}/--00_header.csv".format(dst_dir) # have to make sure header filename is 1st in
# alphabetical order
HDFSwriteString(header_filename, header_record)
HDFSwriteString(header_filename, header_record, compression)

HDFScopyMerge(dst_dir, dst_file, overwrite, deleteSource=True)

Expand Down
2 changes: 1 addition & 1 deletion abalon/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

version = '2.2.7'
version = '2.2.8'


0 comments on commit 5bab5e2

Please sign in to comment.