diff --git a/nemo_curator/utils/file_utils.py b/nemo_curator/utils/file_utils.py index af3c2513..67a70e2f 100644 --- a/nemo_curator/utils/file_utils.py +++ b/nemo_curator/utils/file_utils.py @@ -182,8 +182,7 @@ def _save_jsonl(documents, output_path, start_index=0, max_index=10000, prefix=N """Worker function to write out the data to jsonl files""" def _output_json(document): - myjson = json.dumps(document, ensure_ascii=False) - return myjson.encode("utf-8") + return document.strip().encode('utf-8') def _name(start_index, npad, prefix, i): tag = str(start_index + i).rjust(npad, "0") @@ -195,11 +194,19 @@ def _name(start_index, npad, prefix, i): output_glob_string = os.path.join(output_path, "*.jsonl") - documents.map(_output_json).to_textfiles( + output_files = documents.map(_output_json).to_textfiles( output_glob_string, name_function=name, ) + # Delete empty files generated due to empty partitions in the bag + for output_file in output_files: + try: + if os.path.getsize(output_file) == 0: + os.remove(output_file) + except Exception as exception: + print(f"An exception occurred when trying to delete {output_file}.\n{exception}", flush=True) + def reshard_jsonl( input_dir, output_dir, output_file_size="100M", start_index=0, file_prefix="" @@ -222,7 +229,7 @@ def reshard_jsonl( input_files = list(get_all_files_paths_under(input_dir)) # Read in the dask bag - b = db.read_text(input_files, blocksize=blocksize).map(json.loads) + b = db.read_text(input_files, blocksize=blocksize) # Prepare the output output_dir = expand_outdir_and_mkdir(output_dir)