Skip to content

Commit

Permalink
[FEATURE] KPO use K8S hook
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaelauv committed May 16, 2022
1 parent cd9d935 commit 3adb871
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
convert_volume,
convert_volume_mount,
)
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from airflow.providers.cncf.kubernetes.utils import xcom_sidecar # type: ignore[attr-defined]
from airflow.providers.cncf.kubernetes.utils.pod_manager import (
PodLaunchFailedException,
Expand Down Expand Up @@ -83,6 +84,8 @@ class KubernetesPodOperator(BaseOperator):
:class:`~airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator`, which
simplifies the authorization process.
:param kubernetes_conn_id: The :ref:`kubernetes connection id <howto/connection:kubernetes>`
for the Kubernetes cluster.
:param namespace: the namespace to run within kubernetes.
:param image: Docker image you wish to launch. Defaults to hub.docker.com,
but fully qualified URLS will point to custom repositories. (templated)
Expand Down Expand Up @@ -158,6 +161,7 @@ class KubernetesPodOperator(BaseOperator):
def __init__(
self,
*,
kubernetes_conn_id: Optional[str] = None, # 'kubernetes_default',
namespace: Optional[str] = None,
image: Optional[str] = None,
name: Optional[str] = None,
Expand Down Expand Up @@ -206,6 +210,7 @@ def __init__(
raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
super().__init__(resources=None, **kwargs)

self.kubernetes_conn_id = kubernetes_conn_id
self.do_xcom_push = do_xcom_push
self.image = image
self.namespace = namespace
Expand Down Expand Up @@ -321,7 +326,10 @@ def pod_manager(self) -> PodManager:

@cached_property
def client(self) -> CoreV1Api:
# todo: use airflow Connection / hook to authenticate to the cluster
if self.kubernetes_conn_id:
hook = KubernetesHook(conn_id=self.kubernetes_conn_id)
return hook.core_v1_client

kwargs: Dict[str, Any] = dict(
cluster_context=self.cluster_context,
config_file=self.config_file,
Expand Down

0 comments on commit 3adb871

Please sign in to comment.