Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Argo Pipeline Integration #855

Merged
merged 32 commits into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3cbe261
opencv package annotation
lazargugleta Nov 2, 2022
5776880
Merge branch 'LineaLabs:main' into main
lazargugleta Nov 2, 2022
1f56d22
Merge branch 'LineaLabs:main' into main
lazargugleta Nov 3, 2022
7e021e5
skip tests if cv2 module not installed
lazargugleta Nov 3, 2022
18e2460
Merge remote-tracking branch 'origin/main' into annotating-packages
lazargugleta Nov 3, 2022
da52bf8
remove prettify from test_opencv.py
lazargugleta Nov 3, 2022
abac606
Merge branch 'LineaLabs:main' into main
lazargugleta Nov 3, 2022
67dbf56
Add dask package annotation
lazargugleta Nov 7, 2022
b033f11
Merge branch 'LineaLabs:main' into main
lazargugleta Nov 7, 2022
bc0980c
update dask annotations and tests
lazargugleta Nov 16, 2022
1f429eb
Merge remote-tracking branch 'upstream/main'
lazargugleta Dec 5, 2022
c45ec21
remove to_feather/records
lazargugleta Dec 5, 2022
cdabb36
Merge remote-tracking branch 'upstream/main'
lazargugleta Dec 7, 2022
f085ca4
argo pipeline integration
lazargugleta Dec 7, 2022
74f9242
Merge remote-tracking branch 'upstream/main'
lazargugleta Dec 16, 2022
9b54c50
update to argo dag template and tasks configuration to suite argo
lazargugleta Dec 16, 2022
c22d0f3
Merge remote-tracking branch 'upstream/main'
lazargugleta Dec 19, 2022
3840f47
fix extra import argo_writer
lazargugleta Dec 19, 2022
c1807f9
merge changes
lazargugleta Dec 20, 2022
7f8a32e
change argo dag flavors, adapt to new task graph and output variables
lazargugleta Dec 20, 2022
f34a212
change the post call blocks for argo and kubeflow
lazargugleta Dec 20, 2022
af3200a
remove tests for argo
lazargugleta Dec 20, 2022
8284e32
move all kubernetes operations to _dag file for argo
lazargugleta Dec 21, 2022
61d33c5
add test snapshots
lazargugleta Dec 21, 2022
57b8d68
remove argo specific templates from tasks
lazargugleta Dec 21, 2022
7a3f774
update for runner
lazargugleta Dec 21, 2022
7185ff4
pickle communication enabled with folder creation
lazargugleta Dec 21, 2022
c2d3e2c
update snapshots for airflow
lazargugleta Dec 21, 2022
df881c7
remove stepperartifact flavor
lazargugleta Dec 23, 2022
1b3b0a2
tests failing again
lazargugleta Dec 23, 2022
3e46a5f
update runner
lazargugleta Dec 23, 2022
790bc28
remove step per artifact test from ambr
lazargugleta Dec 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update snapshots for airflow
  • Loading branch information
lazargugleta committed Dec 21, 2022
commit c2d3e2c758632c9663194919e94756155fc53c1d
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ def task_a():

a = a_module.get_a()

if not pathlib.Path("/tmp").joinpath("a").exists():
pathlib.Path("/tmp").joinpath("a").mkdir()
pickle.dump(a, open("/tmp/a/variable_a.pickle", "wb"))


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ def task_a():

a = a_module.get_a()

if not pathlib.Path("/tmp").joinpath("a").exists():
pathlib.Path("/tmp").joinpath("a").mkdir()
pickle.dump(a, open("/tmp/a/variable_a.pickle", "wb"))


Expand Down
151 changes: 23 additions & 128 deletions tests/notebook/test_airflow.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,23 @@
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"Generated module file: /Users/andrewcui/airflow/dags/z_module.py \n",
"Generated requirements file: /Users/andrewcui/airflow/dags/z_requirements.txt \n",
"Generated DAG file: /Users/andrewcui/airflow/dags/z_dag.py \n",
"Generated Docker file: /Users/andrewcui/airflow/dags/z_Dockerfile \n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"<lineapy.plugins.task.TaskGraph object at 0x118e26a60>\n",
"<lineapy.plugins.task.TaskGraph object at 0x118e26a60>\n"
"ename": "RuntimeError",
"evalue": "No context set. This could be because the LineaPy extension has not been loaded in your runtime environment. To load the extension, you need to launch your environment with the `lineapy` command, e.g., `lineapy jupyter notebook`. Or, to load the extension without relaunching the environment, execute `%load_ext lineapy` at the top of your session. Check https://docs.lineapy.org/en/main/guides/interfaces.html#jupyter-and-ipython for more detailed instructions.",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mRuntimeError\u001b[0m Traceback (most recent call last)",
"\u001b[1;32m/home/veloce/Documents/lineapy/tests/notebook/test_airflow.ipynb Cell 1\u001b[0m in \u001b[0;36m<cell line: 8>\u001b[0;34m()\u001b[0m\n\u001b[1;32m <a href='vscode-notebook-cell:/home/veloce/Documents/lineapy/tests/notebook/test_airflow.ipynb#W0sZmlsZQ%3D%3D?line=4'>5</a>\u001b[0m y \u001b[39m=\u001b[39m \u001b[39m10\u001b[39m\n\u001b[1;32m <a href='vscode-notebook-cell:/home/veloce/Documents/lineapy/tests/notebook/test_airflow.ipynb#W0sZmlsZQ%3D%3D?line=6'>7</a>\u001b[0m z \u001b[39m=\u001b[39m [x]\n\u001b[0;32m----> <a href='vscode-notebook-cell:/home/veloce/Documents/lineapy/tests/notebook/test_airflow.ipynb#W0sZmlsZQ%3D%3D?line=7'>8</a>\u001b[0m art \u001b[39m=\u001b[39m lineapy\u001b[39m.\u001b[39;49msave(z, \u001b[39m\"\u001b[39;49m\u001b[39mz\u001b[39;49m\u001b[39m\"\u001b[39;49m)\n\u001b[1;32m <a href='vscode-notebook-cell:/home/veloce/Documents/lineapy/tests/notebook/test_airflow.ipynb#W0sZmlsZQ%3D%3D?line=8'>9</a>\u001b[0m pipeline_dir \u001b[39m=\u001b[39m lineapy\u001b[39m.\u001b[39mto_pipeline([art\u001b[39m.\u001b[39mname], framework\u001b[39m=\u001b[39m\u001b[39m'\u001b[39m\u001b[39mAIRFLOW\u001b[39m\u001b[39m'\u001b[39m, output_dir\u001b[39m=\u001b[39m\u001b[39m\"\u001b[39m\u001b[39m~/airflow/dags\u001b[39m\u001b[39m\"\u001b[39m)\n",
"File \u001b[0;32m~/Documents/lineapy/lineapy/api/api.py:89\u001b[0m, in \u001b[0;36msave\u001b[0;34m(reference, name, storage_backend, **kwargs)\u001b[0m\n\u001b[1;32m 53\u001b[0m \u001b[39mdef\u001b[39;00m \u001b[39msave\u001b[39m(\n\u001b[1;32m 54\u001b[0m reference: \u001b[39mobject\u001b[39m,\n\u001b[1;32m 55\u001b[0m name: \u001b[39mstr\u001b[39m,\n\u001b[1;32m 56\u001b[0m storage_backend: Optional[ARTIFACT_STORAGE_BACKEND] \u001b[39m=\u001b[39m \u001b[39mNone\u001b[39;00m,\n\u001b[1;32m 57\u001b[0m \u001b[39m*\u001b[39m\u001b[39m*\u001b[39mkwargs,\n\u001b[1;32m 58\u001b[0m ) \u001b[39m-\u001b[39m\u001b[39m>\u001b[39m LineaArtifact:\n\u001b[1;32m 59\u001b[0m \u001b[39m\"\"\"\u001b[39;00m\n\u001b[1;32m 60\u001b[0m \u001b[39m Publishes the object to the Linea DB.\u001b[39;00m\n\u001b[1;32m 61\u001b[0m \n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 87\u001b[0m \u001b[39m information we have stored about the artifact (value, version), and other automation capabilities, such as :func:`to_pipeline`.\u001b[39;00m\n\u001b[1;32m 88\u001b[0m \u001b[39m \"\"\"\u001b[39;00m\n\u001b[0;32m---> 89\u001b[0m execution_context \u001b[39m=\u001b[39m get_context()\n\u001b[1;32m 90\u001b[0m executor \u001b[39m=\u001b[39m execution_context\u001b[39m.\u001b[39mexecutor\n\u001b[1;32m 91\u001b[0m db \u001b[39m=\u001b[39m executor\u001b[39m.\u001b[39mdb\n",
"File \u001b[0;32m~/Documents/lineapy/lineapy/execution/context.py:156\u001b[0m, in \u001b[0;36mget_context\u001b[0;34m()\u001b[0m\n\u001b[1;32m 154\u001b[0m \u001b[39mif\u001b[39;00m \u001b[39mnot\u001b[39;00m _current_context:\n\u001b[1;32m 155\u001b[0m track(ExceptionEvent(ErrorType\u001b[39m.\u001b[39mDATABASE, \u001b[39m\"\u001b[39m\u001b[39mNo context set\u001b[39m\u001b[39m\"\u001b[39m))\n\u001b[0;32m--> 156\u001b[0m \u001b[39mraise\u001b[39;00m \u001b[39mRuntimeError\u001b[39;00m(NO_CONTEXT_ERROR_MESSAGE)\n\u001b[1;32m 158\u001b[0m \u001b[39mreturn\u001b[39;00m _current_context\n",
"\u001b[0;31mRuntimeError\u001b[0m: No context set. This could be because the LineaPy extension has not been loaded in your runtime environment. To load the extension, you need to launch your environment with the `lineapy` command, e.g., `lineapy jupyter notebook`. Or, to load the extension without relaunching the environment, execute `%load_ext lineapy` at the top of your session. Check https://docs.lineapy.org/en/main/guides/interfaces.html#jupyter-and-ipython for more detailed instructions."
]
}
],
"source": [
"# NBVAL_IGNORE_OUTPUT\n",
"import lineapy\n",
"%load_ext lineapy\n",
"\n",
"x = 100\n",
"y = 10\n",
Expand All @@ -41,46 +37,10 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": null,
"id": "94daffd2-f19f-4b0e-bbe3-8e94024b405c",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"def get_z():\n",
" x = 100\n",
" z = [x]\n",
" return z\n",
"\n",
"\n",
"def run_session_including_z():\n",
" # Given multiple artifacts, we need to save each right after\n",
" # its calculation to protect from any irrelevant downstream\n",
" # mutations (e.g., inside other artifact calculations)\n",
" import copy\n",
"\n",
" artifacts = dict()\n",
" z = get_z()\n",
" artifacts[\"z\"] = copy.deepcopy(z)\n",
" return artifacts\n",
"\n",
"\n",
"def run_all_sessions():\n",
" artifacts = dict()\n",
" artifacts.update(run_session_including_z())\n",
" return artifacts\n",
"\n",
"\n",
"if __name__ == \"__main__\":\n",
" # Edit this section to customize the behavior of artifacts\n",
" artifacts = run_all_sessions()\n",
" print(artifacts)\n",
"\n"
]
}
],
"outputs": [],
"source": [
"# NBVAL_IGNORE_OUTPUT\n",
"module_path = pipeline_dir/\"z_module.py\"\n",
Expand All @@ -89,88 +49,18 @@
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": null,
"id": "225db4ee-c1ce-437a-b862-f2819bc05335",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"import pathlib\n",
"import pickle\n",
"\n",
"import z_module\n",
"from airflow import DAG\n",
"from airflow.operators.python_operator import PythonOperator\n",
"from airflow.utils.dates import days_ago\n",
"\n",
"\n",
"def task_setup():\n",
"\n",
" pickle_folder = pathlib.Path(\"/tmp\").joinpath(\"z\")\n",
" if not pickle_folder.exists():\n",
" pickle_folder.mkdir()\n",
"\n",
"\n",
"def task_teardown():\n",
"\n",
" pickle_files = pathlib.Path(\"/tmp\").joinpath(\"z\").glob(\"*.pickle\")\n",
" for f in pickle_files:\n",
" f.unlink()\n",
"\n",
"\n",
"def task_z():\n",
"\n",
" z = z_module.get_z()\n",
"\n",
" pickle.dump(z, open(\"/tmp/z/variable_z.pickle\", \"wb\"))\n",
"\n",
"\n",
"default_dag_args = {\n",
" \"owner\": \"airflow\",\n",
" \"retries\": 2,\n",
" \"start_date\": days_ago(1),\n",
"}\n",
"\n",
"with DAG(\n",
" dag_id=\"z_dag\",\n",
" schedule_interval=\"*/15 * * * *\",\n",
" max_active_runs=1,\n",
" catchup=False,\n",
" default_args=default_dag_args,\n",
") as dag:\n",
"\n",
" z = PythonOperator(\n",
" task_id=\"z_task\",\n",
" python_callable=task_z,\n",
" )\n",
"\n",
" setup = PythonOperator(\n",
" task_id=\"setup_task\",\n",
" python_callable=task_setup,\n",
" )\n",
"\n",
" teardown = PythonOperator(\n",
" task_id=\"teardown_task\",\n",
" python_callable=task_teardown,\n",
" )\n",
"\n",
" setup >> z\n",
"\n",
" z >> teardown\n",
"\n"
]
}
],
"outputs": [],
"source": [
"dag_path = pipeline_dir/\"z_dag.py\"\n",
"print(dag_path.read_text())"
]
},
{
"cell_type": "code",
"execution_count": 4,
"execution_count": null,
"id": "f046477f-d97e-4224-98bc-0682061271e2",
"metadata": {},
"outputs": [],
Expand All @@ -183,7 +73,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
Expand All @@ -197,7 +87,12 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.12"
"version": "3.10.6"
},
"vscode": {
"interpreter": {
"hash": "97cc609b13305c559618ec78a438abc56230b9381f827f22d070313b9a1f3777"
}
}
},
"nbformat": 4,
Expand Down