diff --git a/examples/tensorflow/TFRecords-To-Parquet.ipynb b/examples/tensorflow/TFRecords-To-Parquet.ipynb new file mode 100644 index 00000000000..52e9834da99 --- /dev/null +++ b/examples/tensorflow/TFRecords-To-Parquet.ipynb @@ -0,0 +1,1119 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "1d4a2a17", + "metadata": {}, + "outputs": [], + "source": [ + "# Copyright 2021 NVIDIA Corporation. All Rights Reserved.\n", + "#\n", + "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", + "# you may not use this file except in compliance with the License.\n", + "# You may obtain a copy of the License at\n", + "#\n", + "# http://www.apache.org/licenses/LICENSE-2.0\n", + "#\n", + "# Unless required by applicable law or agreed to in writing, software\n", + "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", + "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", + "# See the License for the specific language governing permissions and\n", + "# limitations under the License.\n", + "# ==============================================================================" + ] + }, + { + "cell_type": "markdown", + "id": "7da4cfc5", + "metadata": {}, + "source": [ + "\n", + "\n", + "# TensorFlow: Convert TFRecords to Parquet files\n", + "\n", + "## TFRecords\n", + "\n", + "[TFRecords](https://www.tensorflow.org/tutorials/load_data/tfrecord) are a popular file format to store data for deep learning training with TensorFlow. It is a \"simple format for storing a sequence of binary records\". In many cases the dataset is too large for the host memory and the dataset is converted into (multiple) tfrecords file to disk. TensorFlow's ecosystem enables to stream the tfrecords from disk to train the model without requiring to load the full dataset.

\n", + "That sounds great, but there are some disadvantages when working with tabular dataset. TFRecords stores the dataset as key, values. In other domains, such as computer vision, this representation is efficient as the key is `image` and the values are a the pixels. For an RGB image with 200x200 resoultion, there are 120000 (200x200x3) values. In a tabular dataset, a feature is often a single number and therefore, there is a significant overhead for using a key in each example. **In some of our experiments, we experienced that tfrecords can be ~4-5x larger than `parquet` files for the same dataset.**\n", + "

\n", + "[Parquet](https://en.wikipedia.org/wiki/Apache_Parquet) is another file format to store data. It is a free and open-source data storage format in the Hadoop ecosystem. Many popular systems, such as Spark or Pandas, support to read and write parquet files. \n", + "

\n", + "We developed [NVTabular Data Loaders](https://nvidia.github.io/NVTabular/main/training/index.html) as a customized data loader, fully operating on the GPU. It reads the data from disk into the GPU memory and prepares the next batch on the GPU. Therefore, we do not have any CPU-GPU communication. Our data loader leverages parquet files to reduce the disk pressure. **In our experiments, we experienced that the native data loader is the bottleneck in training tabular deep learning models and by changing the native data loader to NVTabular Data Loader, we saw a 8-9x speed-up.**\n", + "\n", + "### Convert TFRecords to Parquet files\n", + "That is a lot of background information. In many cases, we saw that users have their dataset stored as tfrecords files. In this notebook, we provide a tfrecords to parquet examples. Users can transform their dataset to parquet and be able to experiment with NVTabular data loader." + ] + }, + { + "cell_type": "markdown", + "id": "096a7716", + "metadata": {}, + "source": [ + "We leverage the library pandas-tfrecords." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "35e6c8d4", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u001b[33mWARNING: Value for scheme.platlib does not match. Please report this to \n", + "distutils: /usr/local/lib/python3.8/dist-packages\n", + "sysconfig: /usr/lib/python3.8/site-packages\u001b[0m\n", + "\u001b[33mWARNING: Value for scheme.purelib does not match. Please report this to \n", + "distutils: /usr/local/lib/python3.8/dist-packages\n", + "sysconfig: /usr/lib/python3.8/site-packages\u001b[0m\n", + "\u001b[33mWARNING: Value for scheme.headers does not match. Please report this to \n", + "distutils: /usr/local/include/python3.8/UNKNOWN\n", + "sysconfig: /usr/include/python3.8/UNKNOWN\u001b[0m\n", + "\u001b[33mWARNING: Value for scheme.scripts does not match. Please report this to \n", + "distutils: /usr/local/bin\n", + "sysconfig: /usr/bin\u001b[0m\n", + "\u001b[33mWARNING: Value for scheme.data does not match. Please report this to \n", + "distutils: /usr/local\n", + "sysconfig: /usr\u001b[0m\n", + "\u001b[33mWARNING: Additional context:\n", + "user = False\n", + "home = None\n", + "root = None\n", + "prefix = None\u001b[0m\n", + "Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com\n", + "Requirement already satisfied: pandas-tfrecords==0.1.5 in /usr/local/lib/python3.8/dist-packages (0.1.5)\n", + "Requirement already satisfied: pandas==1.2.4 in /usr/local/lib/python3.8/dist-packages (from pandas-tfrecords==0.1.5) (1.2.4)\n", + "Requirement already satisfied: numpy>=1.16.5 in /usr/local/lib/python3.8/dist-packages (from pandas-tfrecords==0.1.5) (1.19.5)\n", + "Requirement already satisfied: s3fs==2021.6.0 in /usr/local/lib/python3.8/dist-packages (from pandas-tfrecords==0.1.5) (2021.6.0)\n", + "Requirement already satisfied: tensorflow==2.5.0 in /usr/local/lib/python3.8/dist-packages (from pandas-tfrecords==0.1.5) (2.5.0)\n", + "Requirement already satisfied: python-dateutil>=2.7.3 in /usr/local/lib/python3.8/dist-packages (from pandas==1.2.4->pandas-tfrecords==0.1.5) (2.8.2)\n", + "Requirement already satisfied: pytz>=2017.3 in /usr/local/lib/python3.8/dist-packages (from pandas==1.2.4->pandas-tfrecords==0.1.5) (2021.1)\n", + "Requirement already satisfied: aiobotocore>=1.0.1 in /usr/local/lib/python3.8/dist-packages (from s3fs==2021.6.0->pandas-tfrecords==0.1.5) (1.4.1)\n", + "Requirement already satisfied: fsspec==2021.06.0 in /usr/local/lib/python3.8/dist-packages (from s3fs==2021.6.0->pandas-tfrecords==0.1.5) (2021.6.0)\n", + "Requirement already satisfied: wrapt~=1.12.1 in /usr/local/lib/python3.8/dist-packages (from tensorflow==2.5.0->pandas-tfrecords==0.1.5) (1.12.1)\n", + "Requirement already satisfied: keras-preprocessing~=1.1.2 in /usr/local/lib/python3.8/dist-packages (from tensorflow==2.5.0->pandas-tfrecords==0.1.5) (1.1.2)\n", + "Requirement already satisfied: astunparse~=1.6.3 in /usr/local/lib/python3.8/dist-packages (from tensorflow==2.5.0->pandas-tfrecords==0.1.5) (1.6.3)\n", + "Requirement already satisfied: google-pasta~=0.2 in /usr/local/lib/python3.8/dist-packages (from tensorflow==2.5.0->pandas-tfrecords==0.1.5) (0.2.0)\n", + "Requirement already satisfied: typing-extensions~=3.7.4 in /usr/local/lib/python3.8/dist-packages (from tensorflow==2.5.0->pandas-tfrecords==0.1.5) (3.7.4.3)\n", + "Requirement already satisfied: flatbuffers~=1.12.0 in /usr/local/lib/python3.8/dist-packages (from tensorflow==2.5.0->pandas-tfrecords==0.1.5) (1.12)\n", + "Requirement already satisfied: gast==0.4.0 in /usr/local/lib/python3.8/dist-packages (from tensorflow==2.5.0->pandas-tfrecords==0.1.5) (0.4.0)\n", + "Requirement already satisfied: tensorboard~=2.5 in /usr/local/lib/python3.8/dist-packages (from tensorflow==2.5.0->pandas-tfrecords==0.1.5) (2.5.0)\n", + "Requirement already satisfied: grpcio~=1.34.0 in /usr/local/lib/python3.8/dist-packages (from tensorflow==2.5.0->pandas-tfrecords==0.1.5) (1.34.1)\n", + "Requirement already satisfied: tensorflow-estimator<2.6.0,>=2.5.0rc0 in /usr/local/lib/python3.8/dist-packages (from tensorflow==2.5.0->pandas-tfrecords==0.1.5) (2.5.0)\n", + "Requirement already satisfied: six~=1.15.0 in /usr/local/lib/python3.8/dist-packages (from tensorflow==2.5.0->pandas-tfrecords==0.1.5) (1.15.0)\n", + "Requirement already satisfied: opt-einsum~=3.3.0 in /usr/local/lib/python3.8/dist-packages (from tensorflow==2.5.0->pandas-tfrecords==0.1.5) (3.3.0)\n", + "Requirement already satisfied: h5py~=3.1.0 in /usr/local/lib/python3.8/dist-packages (from tensorflow==2.5.0->pandas-tfrecords==0.1.5) (3.1.0)\n", + "Requirement already satisfied: absl-py~=0.10 in /usr/local/lib/python3.8/dist-packages (from tensorflow==2.5.0->pandas-tfrecords==0.1.5) (0.12.0)\n", + "Requirement already satisfied: termcolor~=1.1.0 in /usr/local/lib/python3.8/dist-packages (from tensorflow==2.5.0->pandas-tfrecords==0.1.5) (1.1.0)\n", + "Requirement already satisfied: keras-nightly~=2.5.0.dev in /usr/local/lib/python3.8/dist-packages (from tensorflow==2.5.0->pandas-tfrecords==0.1.5) (2.5.0.dev2021032900)\n", + "Requirement already satisfied: wheel~=0.35 in /usr/local/lib/python3.8/dist-packages (from tensorflow==2.5.0->pandas-tfrecords==0.1.5) (0.36.2)\n", + "Requirement already satisfied: protobuf>=3.9.2 in /usr/local/lib/python3.8/dist-packages (from tensorflow==2.5.0->pandas-tfrecords==0.1.5) (3.17.3)\n", + "Requirement already satisfied: botocore<1.20.107,>=1.20.106 in /usr/local/lib/python3.8/dist-packages (from aiobotocore>=1.0.1->s3fs==2021.6.0->pandas-tfrecords==0.1.5) (1.20.106)\n", + "Requirement already satisfied: aioitertools>=0.5.1 in /usr/local/lib/python3.8/dist-packages (from aiobotocore>=1.0.1->s3fs==2021.6.0->pandas-tfrecords==0.1.5) (0.8.0)\n", + "Requirement already satisfied: aiohttp>=3.3.1 in /usr/local/lib/python3.8/dist-packages (from aiobotocore>=1.0.1->s3fs==2021.6.0->pandas-tfrecords==0.1.5) (3.7.4.post0)\n", + "Requirement already satisfied: multidict<7.0,>=4.5 in /usr/local/lib/python3.8/dist-packages (from aiohttp>=3.3.1->aiobotocore>=1.0.1->s3fs==2021.6.0->pandas-tfrecords==0.1.5) (5.1.0)\n", + "Requirement already satisfied: async-timeout<4.0,>=3.0 in /usr/local/lib/python3.8/dist-packages (from aiohttp>=3.3.1->aiobotocore>=1.0.1->s3fs==2021.6.0->pandas-tfrecords==0.1.5) (3.0.1)\n", + "Requirement already satisfied: attrs>=17.3.0 in /usr/local/lib/python3.8/dist-packages (from aiohttp>=3.3.1->aiobotocore>=1.0.1->s3fs==2021.6.0->pandas-tfrecords==0.1.5) (21.2.0)\n", + "Requirement already satisfied: chardet<5.0,>=2.0 in /usr/lib/python3/dist-packages (from aiohttp>=3.3.1->aiobotocore>=1.0.1->s3fs==2021.6.0->pandas-tfrecords==0.1.5) (3.0.4)\n", + "Requirement already satisfied: yarl<2.0,>=1.0 in /usr/local/lib/python3.8/dist-packages (from aiohttp>=3.3.1->aiobotocore>=1.0.1->s3fs==2021.6.0->pandas-tfrecords==0.1.5) (1.6.3)\n", + "Requirement already satisfied: jmespath<1.0.0,>=0.7.1 in /usr/local/lib/python3.8/dist-packages/jmespath-0.10.0-py3.8.egg (from botocore<1.20.107,>=1.20.106->aiobotocore>=1.0.1->s3fs==2021.6.0->pandas-tfrecords==0.1.5) (0.10.0)\n", + "Requirement already satisfied: urllib3<1.27,>=1.25.4 in /usr/local/lib/python3.8/dist-packages (from botocore<1.20.107,>=1.20.106->aiobotocore>=1.0.1->s3fs==2021.6.0->pandas-tfrecords==0.1.5) (1.26.6)\n", + "Requirement already satisfied: tensorboard-plugin-wit>=1.6.0 in /usr/local/lib/python3.8/dist-packages (from tensorboard~=2.5->tensorflow==2.5.0->pandas-tfrecords==0.1.5) (1.8.0)\n", + "Requirement already satisfied: tensorboard-data-server<0.7.0,>=0.6.0 in /usr/local/lib/python3.8/dist-packages (from tensorboard~=2.5->tensorflow==2.5.0->pandas-tfrecords==0.1.5) (0.6.1)\n", + "Requirement already satisfied: requests<3,>=2.21.0 in /usr/local/lib/python3.8/dist-packages (from tensorboard~=2.5->tensorflow==2.5.0->pandas-tfrecords==0.1.5) (2.26.0)\n", + "Requirement already satisfied: google-auth<2,>=1.6.3 in /usr/local/lib/python3.8/dist-packages (from tensorboard~=2.5->tensorflow==2.5.0->pandas-tfrecords==0.1.5) (1.33.1)\n", + "Requirement already satisfied: setuptools>=41.0.0 in /usr/local/lib/python3.8/dist-packages (from tensorboard~=2.5->tensorflow==2.5.0->pandas-tfrecords==0.1.5) (57.4.0)\n", + "Requirement already satisfied: werkzeug>=0.11.15 in /usr/local/lib/python3.8/dist-packages (from tensorboard~=2.5->tensorflow==2.5.0->pandas-tfrecords==0.1.5) (2.0.1)\n", + "Requirement already satisfied: markdown>=2.6.8 in /usr/local/lib/python3.8/dist-packages (from tensorboard~=2.5->tensorflow==2.5.0->pandas-tfrecords==0.1.5) (3.3.4)\n", + "Requirement already satisfied: google-auth-oauthlib<0.5,>=0.4.1 in /usr/local/lib/python3.8/dist-packages (from tensorboard~=2.5->tensorflow==2.5.0->pandas-tfrecords==0.1.5) (0.4.4)\n", + "Requirement already satisfied: pyasn1-modules>=0.2.1 in /usr/local/lib/python3.8/dist-packages (from google-auth<2,>=1.6.3->tensorboard~=2.5->tensorflow==2.5.0->pandas-tfrecords==0.1.5) (0.2.8)\n", + "Requirement already satisfied: rsa<5,>=3.1.4 in /usr/local/lib/python3.8/dist-packages (from google-auth<2,>=1.6.3->tensorboard~=2.5->tensorflow==2.5.0->pandas-tfrecords==0.1.5) (4.7.2)\n", + "Requirement already satisfied: cachetools<5.0,>=2.0.0 in /usr/local/lib/python3.8/dist-packages (from google-auth<2,>=1.6.3->tensorboard~=2.5->tensorflow==2.5.0->pandas-tfrecords==0.1.5) (4.2.2)\n", + "Requirement already satisfied: requests-oauthlib>=0.7.0 in /usr/local/lib/python3.8/dist-packages (from google-auth-oauthlib<0.5,>=0.4.1->tensorboard~=2.5->tensorflow==2.5.0->pandas-tfrecords==0.1.5) (1.3.0)\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Requirement already satisfied: pyasn1<0.5.0,>=0.4.6 in /usr/local/lib/python3.8/dist-packages (from pyasn1-modules>=0.2.1->google-auth<2,>=1.6.3->tensorboard~=2.5->tensorflow==2.5.0->pandas-tfrecords==0.1.5) (0.4.8)\n", + "Requirement already satisfied: charset-normalizer~=2.0.0 in /usr/local/lib/python3.8/dist-packages (from requests<3,>=2.21.0->tensorboard~=2.5->tensorflow==2.5.0->pandas-tfrecords==0.1.5) (2.0.3)\n", + "Requirement already satisfied: certifi>=2017.4.17 in /usr/local/lib/python3.8/dist-packages (from requests<3,>=2.21.0->tensorboard~=2.5->tensorflow==2.5.0->pandas-tfrecords==0.1.5) (2021.5.30)\n", + "Requirement already satisfied: idna<4,>=2.5 in /usr/local/lib/python3.8/dist-packages (from requests<3,>=2.21.0->tensorboard~=2.5->tensorflow==2.5.0->pandas-tfrecords==0.1.5) (3.2)\n", + "Requirement already satisfied: oauthlib>=3.0.0 in /usr/local/lib/python3.8/dist-packages (from requests-oauthlib>=0.7.0->google-auth-oauthlib<0.5,>=0.4.1->tensorboard~=2.5->tensorflow==2.5.0->pandas-tfrecords==0.1.5) (3.1.1)\n", + "\u001b[33mWARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv\u001b[0m\n", + "\u001b[33mWARNING: You are using pip version 21.2.1; however, version 21.2.4 is available.\n", + "You should consider upgrading via the '/usr/bin/python -m pip install --upgrade pip' command.\u001b[0m\n" + ] + } + ], + "source": [ + "!pip install pandas-tfrecords==0.1.5" + ] + }, + { + "cell_type": "markdown", + "id": "9a8f4dcd", + "metadata": {}, + "source": [ + "## Create a Synthetic Dataset" + ] + }, + { + "cell_type": "markdown", + "id": "243a5cbd", + "metadata": {}, + "source": [ + "First, we will create a synthetic dataset. Afterwards, we will convert the synthetic data to a tfrecord file. The synthetic dataset contains `continuous features`, `categorical features`, `continuous features in a list with variable length`, `categorical features in a list with variable length` and the `label`.

\n", + "The features of a list have variable length, which are often used in session-based recommender systems. For example, the last page views in a session and sessions have different lengths." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "58949777", + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import pandas as pd" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "deeafde3", + "metadata": {}, + "outputs": [], + "source": [ + "def create_synthetic_df(\n", + " N_CONT_FEATURES, N_CAT_FEATURES, N_CONT_LIST_FEATURES, N_CAT_LIST_FEATURES, N_ROWS\n", + "):\n", + " dict_features = {}\n", + " for icont in range(N_CONT_FEATURES):\n", + " dict_features[\"cont\" + str(icont)] = np.random.uniform(-1, 1, size=N_ROWS)\n", + " for icat in range(N_CAT_FEATURES):\n", + " dict_features[\"cat\" + str(icat)] = np.random.choice(list(range(10)), size=N_ROWS)\n", + " for icontlist in range(N_CONT_LIST_FEATURES):\n", + " feature_list = []\n", + " for irow in range(N_ROWS):\n", + " n_elements = np.random.choice(list(range(20)))\n", + " feature_list.append(np.random.uniform(-1, 1, size=n_elements).tolist())\n", + " dict_features[\"cont_list\" + str(icontlist)] = feature_list\n", + " for icatlist in range(N_CAT_LIST_FEATURES):\n", + " feature_list = []\n", + " for irow in range(N_ROWS):\n", + " n_elements = np.random.choice(list(range(20)))\n", + " feature_list.append(np.random.choice(list(range(10)), size=n_elements).tolist())\n", + " dict_features[\"cat_list\" + str(icatlist)] = feature_list\n", + " dict_features[\"label\"] = np.random.choice(list(range(2)), size=N_ROWS)\n", + " df = pd.DataFrame(dict_features)\n", + " return df" + ] + }, + { + "cell_type": "markdown", + "id": "fda49c3f", + "metadata": {}, + "source": [ + "We can configure the size of the dataset and numbers of features of the different type. As this is just a example, we use only 20,000 rows." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "0b141d03", + "metadata": {}, + "outputs": [], + "source": [ + "N_ROWS = 20000\n", + "N_CONT_FEATURES = 5\n", + "N_CAT_FEATURES = 7\n", + "N_CONT_LIST_FEATURES = 2\n", + "N_CAT_LIST_FEATURES = 3" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "6616a87b", + "metadata": {}, + "outputs": [], + "source": [ + "df = create_synthetic_df(\n", + " N_CONT_FEATURES, N_CAT_FEATURES, N_CONT_LIST_FEATURES, N_CAT_LIST_FEATURES, N_ROWS\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "22d66e48", + "metadata": {}, + "source": [ + "We can take a look on the dataset." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "e023dca6", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
cont0cont1cont2cont3cont4cat0cat1cat2cat3cat4cat5cat6cont_list0cont_list1cat_list0cat_list1cat_list2label
0-0.121288-0.357718-0.431527-0.6277830.1919356390470[0.1692354035839727, -0.46975096709111264, 0.3...[-0.019300394422056444, -0.8409590858237292, 0...[][][1, 4, 7, 7, 2, 6, 7, 9, 7, 1, 9, 8, 9]0
1-0.886083-0.689626-0.799476-0.756402-0.5302627614126[-0.8062752683244594, -0.4870759806510412, -0....[0.6996719923900598, -0.8914946563343444, -0.8...[0, 0, 0, 5, 0, 9, 8, 6, 3, 1, 2, 3, 7, 5, 8, 5][6, 7, 9, 1, 1, 0, 2, 7, 8, 9, 4, 0, 9, 2, 7, ...[3, 5, 2, 3, 9, 2]0
20.635865-0.678903-0.7531190.2951340.0066778159043[0.3535400387801406, 0.04545628080492481, 0.66...[-0.002535584894796994, -0.46898774509715535, ...[6, 4, 9, 7, 4, 0, 7][2, 8, 9][4]0
30.2748780.5340650.766480-0.1178080.9393612787887[-0.7778180828602796, -0.7450791973905229, 0.7...[-0.28844878640240035, -0.2848391337111471, 0....[0, 3, 3, 8, 0, 5, 6, 0, 6, 1, 1, 7, 6, 8, 8][3, 5, 0, 1, 7, 9, 9, 1][7, 5, 7, 8, 8, 0, 9, 0, 3, 9, 0, 0, 0, 2, 0, ...1
4-0.519931-0.6927670.4054100.5553090.4941689432900[0.9219042955449821, -0.2829414261782328, 0.74...[-0.31399014027475736, 0.20048294938775935, -0...[][6, 7, 4, 1, 4, 4, 1, 5][2, 2, 7, 6, 0, 1, 0, 9, 8, 8, 7, 7]0
\n", + "
" + ], + "text/plain": [ + " cont0 cont1 cont2 cont3 cont4 cat0 cat1 cat2 cat3 \\\n", + "0 -0.121288 -0.357718 -0.431527 -0.627783 0.191935 6 3 9 0 \n", + "1 -0.886083 -0.689626 -0.799476 -0.756402 -0.530262 7 6 1 4 \n", + "2 0.635865 -0.678903 -0.753119 0.295134 0.006677 8 1 5 9 \n", + "3 0.274878 0.534065 0.766480 -0.117808 0.939361 2 7 8 7 \n", + "4 -0.519931 -0.692767 0.405410 0.555309 0.494168 9 4 3 2 \n", + "\n", + " cat4 cat5 cat6 cont_list0 \\\n", + "0 4 7 0 [0.1692354035839727, -0.46975096709111264, 0.3... \n", + "1 1 2 6 [-0.8062752683244594, -0.4870759806510412, -0.... \n", + "2 0 4 3 [0.3535400387801406, 0.04545628080492481, 0.66... \n", + "3 8 8 7 [-0.7778180828602796, -0.7450791973905229, 0.7... \n", + "4 9 0 0 [0.9219042955449821, -0.2829414261782328, 0.74... \n", + "\n", + " cont_list1 \\\n", + "0 [-0.019300394422056444, -0.8409590858237292, 0... \n", + "1 [0.6996719923900598, -0.8914946563343444, -0.8... \n", + "2 [-0.002535584894796994, -0.46898774509715535, ... \n", + "3 [-0.28844878640240035, -0.2848391337111471, 0.... \n", + "4 [-0.31399014027475736, 0.20048294938775935, -0... \n", + "\n", + " cat_list0 \\\n", + "0 [] \n", + "1 [0, 0, 0, 5, 0, 9, 8, 6, 3, 1, 2, 3, 7, 5, 8, 5] \n", + "2 [6, 4, 9, 7, 4, 0, 7] \n", + "3 [0, 3, 3, 8, 0, 5, 6, 0, 6, 1, 1, 7, 6, 8, 8] \n", + "4 [] \n", + "\n", + " cat_list1 \\\n", + "0 [] \n", + "1 [6, 7, 9, 1, 1, 0, 2, 7, 8, 9, 4, 0, 9, 2, 7, ... \n", + "2 [2, 8, 9] \n", + "3 [3, 5, 0, 1, 7, 9, 9, 1] \n", + "4 [6, 7, 4, 1, 4, 4, 1, 5] \n", + "\n", + " cat_list2 label \n", + "0 [1, 4, 7, 7, 2, 6, 7, 9, 7, 1, 9, 8, 9] 0 \n", + "1 [3, 5, 2, 3, 9, 2] 0 \n", + "2 [4] 0 \n", + "3 [7, 5, 7, 8, 8, 0, 9, 0, 3, 9, 0, 0, 0, 2, 0, ... 1 \n", + "4 [2, 2, 7, 6, 0, 1, 0, 9, 8, 8, 7, 7] 0 " + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df.head()" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "6a49b022", + "metadata": {}, + "outputs": [], + "source": [ + "CONTINUOUS_COLUMNS = [\"cont\" + str(i) for i in range(N_CONT_FEATURES)]\n", + "CATEGORICAL_COLUMNS = [\"cat\" + str(i) for i in range(N_CAT_FEATURES)]\n", + "CONTINUOUS_LIST_COLUMNS = [\"cont_list\" + str(i) for i in range(N_CONT_LIST_FEATURES)]\n", + "CATEGORICAL_LIST_COLUMNS = [\"cat_list\" + str(i) for i in range(N_CAT_LIST_FEATURES)]\n", + "LABEL_COLUMNS = [\"label\"]" + ] + }, + { + "cell_type": "markdown", + "id": "bb33cb9b", + "metadata": {}, + "source": [ + "## Convert the Synthetic Dataset into TFRecords" + ] + }, + { + "cell_type": "markdown", + "id": "5a8b05b0", + "metadata": {}, + "source": [ + "After we created the synthetic dataset, we store it to tfrecords." + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "055a8dae", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2021-09-14 13:44:22.062009: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0\n" + ] + } + ], + "source": [ + "import tensorflow as tf" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "f8f502ff", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import multiprocessing as mp\n", + "from itertools import repeat\n", + "\n", + "\n", + "def transform_tfrecords(\n", + " df,\n", + " PATH,\n", + " CONTINUOUS_COLUMNS,\n", + " CATEGORICAL_COLUMNS,\n", + " CONTINUOUS_LIST_COLUMNS,\n", + " CATEGORICAL_LIST_COLUMNS,\n", + " LABEL_COLUMNS,\n", + "):\n", + " write_dir = os.path.dirname(PATH)\n", + " if not os.path.exists(write_dir):\n", + " os.makedirs(write_dir)\n", + " file_idx, example_idx = 0, 0\n", + " writer = get_writer(write_dir, file_idx)\n", + " column_names = [\n", + " CONTINUOUS_COLUMNS,\n", + " CATEGORICAL_COLUMNS + LABEL_COLUMNS,\n", + " CONTINUOUS_LIST_COLUMNS,\n", + " CATEGORICAL_LIST_COLUMNS,\n", + " ]\n", + " with mp.Pool(8, pool_initializer, column_names) as pool:\n", + " data = []\n", + " for col_names in column_names:\n", + " if len(col_names) == 0:\n", + " data.append(repeat(None))\n", + " else:\n", + " data.append(df[col_names].values)\n", + " data = zip(*data)\n", + " record_map = pool.imap(build_and_serialize_example, data, chunksize=200)\n", + " for record in record_map:\n", + " writer.write(record)\n", + " example_idx += 1\n", + " writer.close()\n", + "\n", + "\n", + "def pool_initializer(num_cols, cat_cols, num_list_cols, cat_list_cols):\n", + " global numeric_columns\n", + " global categorical_columns\n", + " global numeric_list_columns\n", + " global categorical_list_columns\n", + " numeric_columns = num_cols\n", + " categorical_columns = cat_cols\n", + " numeric_list_columns = num_list_cols\n", + " categorical_list_columns = cat_list_cols\n", + "\n", + "\n", + "def build_and_serialize_example(data):\n", + " numeric_values, categorical_values, numeric_list_values, categorical_list_values = data\n", + " feature = {}\n", + " if numeric_values is not None:\n", + " feature.update(\n", + " {\n", + " col: tf.train.Feature(float_list=tf.train.FloatList(value=[val]))\n", + " for col, val in zip(numeric_columns, numeric_values)\n", + " }\n", + " )\n", + " if categorical_values is not None:\n", + " feature.update(\n", + " {\n", + " col: tf.train.Feature(int64_list=tf.train.Int64List(value=[val]))\n", + " for col, val in zip(categorical_columns, categorical_values)\n", + " }\n", + " )\n", + " if numeric_list_values is not None:\n", + " feature.update(\n", + " {\n", + " col: tf.train.Feature(float_list=tf.train.FloatList(value=val))\n", + " for col, val in zip(numeric_list_columns, numeric_list_values)\n", + " }\n", + " )\n", + " if categorical_list_values is not None:\n", + " feature.update(\n", + " {\n", + " col: tf.train.Feature(int64_list=tf.train.Int64List(value=val))\n", + " for col, val in zip(categorical_list_columns, categorical_list_values)\n", + " }\n", + " )\n", + " return tf.train.Example(features=tf.train.Features(feature=feature)).SerializeToString()\n", + "\n", + "\n", + "def get_writer(write_dir, file_idx):\n", + " filename = str(file_idx).zfill(5) + \".tfrecords\"\n", + " return tf.io.TFRecordWriter(os.path.join(write_dir, filename))" + ] + }, + { + "cell_type": "markdown", + "id": "f0430ce5", + "metadata": {}, + "source": [ + "We define the output path." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "0ca623b3", + "metadata": {}, + "outputs": [], + "source": [ + "PATH = \"/raid/tfrecord-test/\"" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "2619480a", + "metadata": {}, + "outputs": [], + "source": [ + "!rm -rf $PATH\n", + "!mkdir $PATH" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "b88f1b42", + "metadata": {}, + "outputs": [], + "source": [ + "transform_tfrecords(\n", + " df,\n", + " PATH,\n", + " CONTINUOUS_COLUMNS,\n", + " CATEGORICAL_COLUMNS,\n", + " CONTINUOUS_LIST_COLUMNS,\n", + " CATEGORICAL_LIST_COLUMNS,\n", + " LABEL_COLUMNS,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "25ad1044", + "metadata": {}, + "source": [ + "We can check the file." + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "31362c7e", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "00000.tfrecords\r\n" + ] + } + ], + "source": [ + "!ls $PATH" + ] + }, + { + "cell_type": "markdown", + "id": "69fc385f", + "metadata": {}, + "source": [ + "## Convert TFRecords to parquet files" + ] + }, + { + "cell_type": "markdown", + "id": "3aafe8a0", + "metadata": {}, + "source": [ + "Now, we have a dataset in the tfrecords format. Let's use the `convert_tfrecords_to_parquet` function to convert a tfrecord file into parquet." + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "62fa679c", + "metadata": {}, + "outputs": [], + "source": [ + "import glob\n", + "\n", + "from nvtabular.framework_utils.tensorflow.tfrecords_to_parquet import convert_tfrecords_to_parquet" + ] + }, + { + "cell_type": "markdown", + "id": "1e59596b", + "metadata": {}, + "source": [ + "Let's select all TFRecords in the folder." + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "fd930951", + "metadata": {}, + "outputs": [], + "source": [ + "filenames = glob.glob(PATH + \"/*.tfrecords\")" + ] + }, + { + "cell_type": "markdown", + "id": "3eab6554", + "metadata": {}, + "source": [ + "Let's call the `convert_tfrecords_to_parquet`.

\n", + "Some details about the parameters:\n", + "* `compression_type` is the compression type of the tfrecords. Options: `\"\"` (no compression), `\"ZLIB\"`, or `\"GZIP\"`\n", + "* `chunks` defines how many data points per `parquet` file should be saved. It splits a tfrecords into multiple parquet files.\n", + "* `convert_lists` defines, if feature lists should be converted into muliple feature columns. Even single dataframe series are 1 dimensional arrays when converted back from tfrecords to parquet. " + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "d249b965", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['/raid/tfrecord-test/00000.tfrecords']" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "filenames" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "854f2aa3", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2021-09-14 13:44:35.832870: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1\n", + "2021-09-14 13:44:35.834890: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1733] Found device 0 with properties: \n", + "pciBusID: 0000:0b:00.0 name: Tesla V100-SXM2-32GB computeCapability: 7.0\n", + "coreClock: 1.53GHz coreCount: 80 deviceMemorySize: 31.75GiB deviceMemoryBandwidth: 836.37GiB/s\n", + "2021-09-14 13:44:35.834919: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0\n", + "2021-09-14 13:44:35.834964: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcublas.so.11\n", + "2021-09-14 13:44:35.834998: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcublasLt.so.11\n", + "2021-09-14 13:44:35.835031: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcufft.so.10\n", + "2021-09-14 13:44:35.835064: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcurand.so.10\n", + "2021-09-14 13:44:35.835111: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcusolver.so.11\n", + "2021-09-14 13:44:35.835144: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcusparse.so.11\n", + "2021-09-14 13:44:35.835180: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudnn.so.8\n", + "2021-09-14 13:44:35.838915: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1871] Adding visible gpu devices: 0\n", + "2021-09-14 13:44:35.839664: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations: AVX2 FMA\n", + "To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.\n", + "2021-09-14 13:44:35.848336: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1733] Found device 0 with properties: \n", + "pciBusID: 0000:0b:00.0 name: Tesla V100-SXM2-32GB computeCapability: 7.0\n", + "coreClock: 1.53GHz coreCount: 80 deviceMemorySize: 31.75GiB deviceMemoryBandwidth: 836.37GiB/s\n", + "2021-09-14 13:44:35.852172: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1871] Adding visible gpu devices: 0\n", + "2021-09-14 13:44:35.852216: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0\n", + "2021-09-14 13:44:37.236098: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1258] Device interconnect StreamExecutor with strength 1 edge matrix:\n", + "2021-09-14 13:44:37.236140: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1264] 0 \n", + "2021-09-14 13:44:37.236150: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1277] 0: N \n", + "2021-09-14 13:44:37.241900: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1418] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 30677 MB memory) -> physical GPU (device: 0, name: Tesla V100-SXM2-32GB, pci bus id: 0000:0b:00.0, compute capability: 7.0)\n", + "2021-09-14 13:44:37.301800: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)\n", + "2021-09-14 13:44:37.305671: I tensorflow/core/platform/profile_utils/cpu_utils.cc:114] CPU Frequency: 2195025000 Hz\n", + "20000it [00:12, 1558.83it/s]\n" + ] + } + ], + "source": [ + "convert_tfrecords_to_parquet(\n", + " filenames=filenames, output_dir=PATH, compression_type=\"\", chunks=1000, convert_lists=True\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "c6a3881c", + "metadata": {}, + "source": [ + "## Let's take a look" + ] + }, + { + "cell_type": "markdown", + "id": "897c4ea3", + "metadata": {}, + "source": [ + "We can see that `convert_tfrecords_to_parquet` created multiple files per `tfrecord` depending on the chunk size." + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "dab31264", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['/raid/tfrecord-test/00000.parquet']" + ] + }, + "execution_count": 19, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "filenames = glob.glob(PATH + \"/*.parquet\")\n", + "filenames" + ] + }, + { + "cell_type": "markdown", + "id": "453e26eb", + "metadata": {}, + "source": [ + "If we load the first file, we cann see, that it has the same structure as our original synthetic dataset." + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "0bd30a89", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
cat0cat1cat2cat3cat4cat5cat6cat_list0cat_list1cat_list2cont0cont1cont2cont3cont4cont_list0cont_list1label
06390470[][][1, 4, 7, 7, 2, 6, 7, 9, 7, 1, 9, 8, 9]-0.121288-0.357718-0.431527-0.6277830.191935[0.16923541, -0.46975097, 0.36240318, -0.05831...[-0.019300394, -0.8409591, 0.6081534, 0.050789...0
17614126[0, 0, 0, 5, 0, 9, 8, 6, 3, 1, 2, 3, 7, 5, 8, 5][6, 7, 9, 1, 1, 0, 2, 7, 8, 9, 4, 0, 9, 2, 7, ...[3, 5, 2, 3, 9, 2]-0.886083-0.689626-0.799476-0.756402-0.530262[-0.80627525, -0.48707598, -0.6516318, 0.87470...[0.699672, -0.89149463, -0.8134837, -0.9065274...0
28159043[6, 4, 9, 7, 4, 0, 7][2, 8, 9][4]0.635865-0.678903-0.7531190.2951340.006677[0.35354003, 0.04545628, 0.6673933, 0.4735813,...[-0.0025355848, -0.46898773, -0.07290607, -0.8...0
32787887[0, 3, 3, 8, 0, 5, 6, 0, 6, 1, 1, 7, 6, 8, 8][3, 5, 0, 1, 7, 9, 9, 1][7, 5, 7, 8, 8, 0, 9, 0, 3, 9, 0, 0, 0, 2, 0, ...0.2748780.5340650.766480-0.1178080.939361[-0.7778181, -0.7450792, 0.7001909, -0.7610098...[-0.28844878, -0.28483912, 0.18376812, 0.32782...1
49432900[][6, 7, 4, 1, 4, 4, 1, 5][2, 2, 7, 6, 0, 1, 0, 9, 8, 8, 7, 7]-0.519931-0.6927670.4054100.5553090.494168[0.92190427, -0.28294143, 0.7465968, 0.5406436...[-0.31399015, 0.20048295, -0.8439063, -0.46556...0
\n", + "
" + ], + "text/plain": [ + " cat0 cat1 cat2 cat3 cat4 cat5 cat6 \\\n", + "0 6 3 9 0 4 7 0 \n", + "1 7 6 1 4 1 2 6 \n", + "2 8 1 5 9 0 4 3 \n", + "3 2 7 8 7 8 8 7 \n", + "4 9 4 3 2 9 0 0 \n", + "\n", + " cat_list0 \\\n", + "0 [] \n", + "1 [0, 0, 0, 5, 0, 9, 8, 6, 3, 1, 2, 3, 7, 5, 8, 5] \n", + "2 [6, 4, 9, 7, 4, 0, 7] \n", + "3 [0, 3, 3, 8, 0, 5, 6, 0, 6, 1, 1, 7, 6, 8, 8] \n", + "4 [] \n", + "\n", + " cat_list1 \\\n", + "0 [] \n", + "1 [6, 7, 9, 1, 1, 0, 2, 7, 8, 9, 4, 0, 9, 2, 7, ... \n", + "2 [2, 8, 9] \n", + "3 [3, 5, 0, 1, 7, 9, 9, 1] \n", + "4 [6, 7, 4, 1, 4, 4, 1, 5] \n", + "\n", + " cat_list2 cont0 cont1 \\\n", + "0 [1, 4, 7, 7, 2, 6, 7, 9, 7, 1, 9, 8, 9] -0.121288 -0.357718 \n", + "1 [3, 5, 2, 3, 9, 2] -0.886083 -0.689626 \n", + "2 [4] 0.635865 -0.678903 \n", + "3 [7, 5, 7, 8, 8, 0, 9, 0, 3, 9, 0, 0, 0, 2, 0, ... 0.274878 0.534065 \n", + "4 [2, 2, 7, 6, 0, 1, 0, 9, 8, 8, 7, 7] -0.519931 -0.692767 \n", + "\n", + " cont2 cont3 cont4 \\\n", + "0 -0.431527 -0.627783 0.191935 \n", + "1 -0.799476 -0.756402 -0.530262 \n", + "2 -0.753119 0.295134 0.006677 \n", + "3 0.766480 -0.117808 0.939361 \n", + "4 0.405410 0.555309 0.494168 \n", + "\n", + " cont_list0 \\\n", + "0 [0.16923541, -0.46975097, 0.36240318, -0.05831... \n", + "1 [-0.80627525, -0.48707598, -0.6516318, 0.87470... \n", + "2 [0.35354003, 0.04545628, 0.6673933, 0.4735813,... \n", + "3 [-0.7778181, -0.7450792, 0.7001909, -0.7610098... \n", + "4 [0.92190427, -0.28294143, 0.7465968, 0.5406436... \n", + "\n", + " cont_list1 label \n", + "0 [-0.019300394, -0.8409591, 0.6081534, 0.050789... 0 \n", + "1 [0.699672, -0.89149463, -0.8134837, -0.9065274... 0 \n", + "2 [-0.0025355848, -0.46898773, -0.07290607, -0.8... 0 \n", + "3 [-0.28844878, -0.28483912, 0.18376812, 0.32782... 1 \n", + "4 [-0.31399015, 0.20048295, -0.8439063, -0.46556... 0 " + ] + }, + "execution_count": 20, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df = pd.read_parquet(filenames[0])\n", + "df.head()" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "id": "b2ce99e0", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "(20000, 18)" + ] + }, + "execution_count": 21, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df.shape" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.10" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/nvtabular/framework_utils/tensorflow/tfrecords_to_parquet.py b/nvtabular/framework_utils/tensorflow/tfrecords_to_parquet.py new file mode 100644 index 00000000000..682f554c697 --- /dev/null +++ b/nvtabular/framework_utils/tensorflow/tfrecords_to_parquet.py @@ -0,0 +1,111 @@ +# Copyright (c) 2021, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import gc + +import cudf +import tensorflow as tf +from cudf.core.column.lists import is_list_dtype +from cudf.io.parquet import ParquetWriter +from pandas_tfrecords.from_tfrecords import _get_feature_type, read_example +from tqdm import tqdm + + +def convert_tfrecords_to_parquet( + filenames, output_dir, compression_type="", chunks=100000, convert_lists=False +): + """ + Converts tfrecord files to parquet file format + + Parameters + ---------- + filenames: list + List of tfrecord filenames, which should be converted + output_dir: str + Output path where the parquet files will be stored + compression_type: str + Compression type of the tfrecords. Options: `""` (no compression), `"ZLIB"`, or `"GZIP"` + chunks: int + Chunks to convert tfrecords into parquet + convert_lists: Boolean + Output of tfrecords are lists. Set True to convert lists with fixed length to + individual columns in the output dataframe + + """ + + for file in filenames: + dataset = tf.data.TFRecordDataset(file, compression_type=compression_type) + features = _detect_schema(dataset) + parser = read_example(features) + parsed = dataset.map(parser) + _to_parquet(parsed, file, output_dir, chunks, convert_lists) + + +def _detect_schema(dataset): + # inspired by + # https://github.com/schipiga/pandas-tfrecords/blob/master/pandas_tfrecords/from_tfrecords.py + features = {} + + serialized = next(iter(dataset.map(lambda serialized: serialized))) + seq_ex = tf.train.SequenceExample.FromString(serialized.numpy()) + + if seq_ex.context.feature: + for key, feature in seq_ex.context.feature.items(): + features[key] = tf.io.FixedLenSequenceFeature( + (), _get_feature_type(feature=feature), allow_missing=True + ) + + return features + + +def _to_parquet(tfrecords, file, output_dir, chunks, convert_lists): + out = [] + i = 0 + w = ParquetWriter(output_dir + file.split("/")[-1].split(".")[0] + ".parquet") + for tfrecord in tqdm(tfrecords): + row = {key: val.numpy() for key, val in tfrecord.items()} + out.append(row) + i += 1 + if i == chunks: + df = cudf.DataFrame(out) + if convert_lists: + df = _convert_lists(df) + w.write_table(df) + i = 0 + out = [] + del df + gc.collect() + if len(out) > 0: + df = cudf.DataFrame(out) + if convert_lists: + df = _convert_lists(df) + w.write_table(df) + del df + gc.collect() + w.close() + + +def _convert_lists(df): + for col in df.columns: + if is_list_dtype(df[col]): + series_length = df[col].list.len() + if series_length.var() == 0 and series_length.min() > 0: + if series_length.max() == 1: + df[col] = df[col].list.get(0) + else: + for i in range(series_length.max()): + df[col + "_" + str(i)] = df[col].list.get(i) + df.drop([col], axis=1, inplace=True) + return df