Skip to content

Commit

Permalink
Update notebook (#155)
Browse files Browse the repository at this point in the history
* Update notebook

By also writing using PyIceberg and removing Spark from the example

* Add some guards

* Oops
  • Loading branch information
Fokko committed May 14, 2024
1 parent 8b126ca commit 7239f7d
Showing 1 changed file with 65 additions and 78 deletions.
143 changes: 65 additions & 78 deletions spark/notebooks/PyIceberg - Getting Started.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,6 @@
"![iceberg-logo](https://www.apache.org/logos/res/iceberg/iceberg.png)"
]
},
{
"cell_type": "markdown",
"id": "247fb2ab",
"metadata": {},
"source": [
"### [Docker, Spark, and Iceberg: The Fastest Way to Try Iceberg!](https://tabular.io/blog/docker-spark-and-iceberg/)"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -35,131 +27,126 @@
"source": [
"## Load NYC Taxi/Limousine Trip Data\n",
"\n",
"For this notebook, we will use the New York City Taxi and Limousine Commision Trip Record Data that's available on the AWS Open Data Registry. This contains data of trips taken by taxis and for-hire vehicles in New York City. We'll save this into an iceberg table called `taxis`."
"For this notebook, we will use the New York City Taxi and Limousine Commision Trip Record Data that's available on the AWS Open Data Registry. This contains data of trips taken by taxis and for-hire vehicles in New York City. We'll save this into an iceberg table called `taxis`.\n",
"\n",
"First, load the Parquet file using PyArrow:"
]
},
{
"cell_type": "markdown",
"id": "747bee98",
"cell_type": "code",
"execution_count": null,
"id": "1a890a18-6078-4574-8ade-7598ba91bf6b",
"metadata": {},
"outputs": [],
"source": [
"To be able to rerun the notebook several times, let's drop the table if it exists to start fresh."
"import pyarrow.parquet as pq\n",
"\n",
"tbl_taxis = pq.read_table('/home/iceberg/data/yellow_tripdata_2021-04.parquet')\n",
"tbl_taxis"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "84fd09a4",
"cell_type": "markdown",
"id": "63d2ac2a-7e21-4f5b-b357-a020797099fd",
"metadata": {},
"outputs": [],
"source": [
"%%sql\n",
"## Creating the table\n",
"\n",
"CREATE DATABASE IF NOT EXISTS nyc;"
"Next, create the namespace, and the `taxis` table from the schema that's derived from the Arrow schema:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "07bf9dc2",
"id": "9fddb808",
"metadata": {},
"outputs": [],
"source": [
"%%sql\n",
"DROP TABLE IF EXISTS nyc.taxis;"
"from pyiceberg.catalog import load_catalog\n",
"from pyiceberg.exceptions import NamespaceAlreadyExistsError\n",
"\n",
"cat = load_catalog('default')\n",
"\n",
"try:\n",
" cat.create_namespace('default')\n",
"except NamespaceAlreadyExistsError:\n",
" pass"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "363f815a",
"id": "430bd828-f856-4230-aff7-94274fbce96d",
"metadata": {},
"outputs": [],
"source": [
"%%sql\n",
"CREATE TABLE IF NOT EXISTS nyc.taxis (\n",
" VendorID bigint,\n",
" tpep_pickup_datetime timestamp,\n",
" tpep_dropoff_datetime timestamp,\n",
" passenger_count double,\n",
" trip_distance double,\n",
" RatecodeID double,\n",
" store_and_fwd_flag string,\n",
" PULocationID bigint,\n",
" DOLocationID bigint,\n",
" payment_type bigint,\n",
" fare_amount double,\n",
" extra double,\n",
" mta_tax double,\n",
" tip_amount double,\n",
" tolls_amount double,\n",
" improvement_surcharge double,\n",
" total_amount double,\n",
" congestion_surcharge double,\n",
" airport_fee double\n",
"from pyiceberg.exceptions import NoSuchTableError\n",
"\n",
"try:\n",
" cat.drop_table('default.taxis')\n",
"except NoSuchTableError:\n",
" pass\n",
"\n",
"tbl = cat.create_table(\n",
" 'default.taxis',\n",
" schema=tbl_taxis.schema\n",
")\n",
"USING iceberg\n",
"PARTITIONED BY (days(tpep_pickup_datetime))"
"\n",
"tbl"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "47645b52",
"cell_type": "markdown",
"id": "56818a92-12c6-4806-a700-3071b9b3753c",
"metadata": {},
"outputs": [],
"source": [
"%%sql\n",
"## Write the actual data into the table\n",
"\n",
"TRUNCATE TABLE nyc.taxis"
"This will create a new snapshot on the table:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9fddb808",
"id": "672a87b1-7132-489f-934c-8243016b20b4",
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import SparkSession\n",
"spark = SparkSession.builder.appName(\"Jupyter\").getOrCreate()\n",
"tbl.overwrite(tbl_taxis)\n",
"\n",
"for filename in [\n",
" \"yellow_tripdata_2022-04.parquet\",\n",
" \"yellow_tripdata_2022-03.parquet\",\n",
" \"yellow_tripdata_2022-02.parquet\",\n",
" \"yellow_tripdata_2022-01.parquet\",\n",
" \"yellow_tripdata_2021-12.parquet\",\n",
"]:\n",
" df = spark.read.parquet(f\"/home/iceberg/data/{filename}\")\n",
" df.write.mode(\"append\").saveAsTable(\"nyc.taxis\")"
"tbl"
]
},
{
"cell_type": "markdown",
"id": "cffd2c03",
"id": "d87c4f8e-3d04-493b-9faf-292b39656a48",
"metadata": {},
"source": [
"## Load data into a PyArrow Dataframe\n",
"## Append more data\n",
"\n",
"We'll fetch the table using the REST catalog that comes with the setup."
"Let's append another month of data to the table:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "efee8252",
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"id": "0a9b5f47-d696-4742-9b72-b4ea203bd8de",
"metadata": {},
"outputs": [],
"source": [
"from pyiceberg.catalog import load_catalog\n",
"from pyiceberg.expressions import GreaterThanOrEqual\n",
"tbl.append(pq.read_table('/home/iceberg/data/yellow_tripdata_2021-05.parquet'))\n",
"\n",
"tbl"
]
},
{
"cell_type": "markdown",
"id": "efa23071-8207-4c3d-86bc-db5bf4d768c0",
"metadata": {},
"source": [
"## Load data into a PyArrow Dataframe\n",
"\n",
"catalog = load_catalog('default')"
"We'll fetch the table using the REST catalog that comes with the setup."
]
},
{
Expand All @@ -169,9 +156,9 @@
"metadata": {},
"outputs": [],
"source": [
"tbl = catalog.load_table('nyc.taxis')\n",
"tbl = cat.load_table('default.taxis')\n",
"\n",
"sc = tbl.scan(row_filter=GreaterThanOrEqual(\"tpep_pickup_datetime\", \"2022-01-01T00:00:00.000000+00:00\"))"
"sc = tbl.scan(row_filter=\"tpep_pickup_datetime >= '2021-05-01T00:00:00.000000'\")"
]
},
{
Expand Down Expand Up @@ -374,7 +361,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.16"
"version": "3.9.19"
}
},
"nbformat": 4,
Expand Down

0 comments on commit 7239f7d

Please sign in to comment.