Skip to content

Commit

Permalink
Use KubernetesHook to create api client in KubernetesPodOperator
Browse files Browse the repository at this point in the history
  • Loading branch information
dstandish committed Feb 4, 2022
1 parent ab762a5 commit 5a9a2d8
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 9 deletions.
6 changes: 6 additions & 0 deletions airflow/providers/cncf/kubernetes/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@ Bug Fixes
* ``Even more typing in operators (template_fields/ext) (#20608)``
* ``Update documentation for provider December 2021 release (#20523)``
Features
~~~~~~~~

* Can now derive kubernetes creds using airflow connection, through KubernetesHook


2.2.0
.....

Expand Down
24 changes: 15 additions & 9 deletions airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from kubernetes.client import CoreV1Api, models as k8s

from airflow.exceptions import AirflowException
from airflow.kubernetes import kube_client, pod_generator
from airflow.kubernetes import pod_generator
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.kubernetes.secret import Secret
from airflow.models import BaseOperator
Expand All @@ -42,6 +42,7 @@
convert_volume,
convert_volume_mount,
)
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from airflow.providers.cncf.kubernetes.utils import xcom_sidecar
from airflow.providers.cncf.kubernetes.utils.pod_manager import PodLaunchFailedException, PodManager, PodPhase
from airflow.settings import pod_mutation_hook
Expand Down Expand Up @@ -136,7 +137,9 @@ class KubernetesPodOperator(BaseOperator):
:param priority_class_name: priority class name for the launched Pod
:param termination_grace_period: Termination grace period if task killed in UI,
defaults to kubernetes default
:type termination_grace_period: int
:param: kubernetes_conn_id: To retrieve creds for your k8s cluster from an Airflow connection
:type: str
"""

BASE_CONTAINER_NAME = 'base'
Expand Down Expand Up @@ -198,12 +201,13 @@ def __init__(
pod_runtime_info_envs: Optional[List[k8s.V1EnvVar]] = None,
termination_grace_period: Optional[int] = None,
configmaps: Optional[List[str]] = None,
kubernetes_conn_id: Optional[str] = None,
**kwargs,
) -> None:
if kwargs.get('xcom_push') is not None:
raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
super().__init__(resources=None, **kwargs)

self.kubernetes_conn_id = kubernetes_conn_id
self.do_xcom_push = do_xcom_push
self.image = image
self.namespace = namespace
Expand Down Expand Up @@ -308,15 +312,17 @@ def pod_manager(self) -> PodManager:
return PodManager(kube_client=self.client)

@cached_property
def client(self) -> CoreV1Api:
# todo: use airflow Connection / hook to authenticate to the cluster
kwargs: Dict[str, Any] = dict(
def k8s_hook(self):
return KubernetesHook(
conn_id=self.kubernetes_conn_id,
cluster_context=self.cluster_context,
config_file=self.config_file,
in_cluster=self.in_cluster,
)
if self.in_cluster is not None:
kwargs.update(in_cluster=self.in_cluster)
return kube_client.get_kube_client(**kwargs)

@cached_property
def client(self) -> CoreV1Api:
return self.k8s_hook.core_v1_client

def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]:
"""Returns an already-running pod for this task instance if one exists."""
Expand Down

0 comments on commit 5a9a2d8

Please sign in to comment.