diff --git a/airflow/providers/cncf/kubernetes/CHANGELOG.rst b/airflow/providers/cncf/kubernetes/CHANGELOG.rst index d35a80bf4b280..92b362a32eaad 100644 --- a/airflow/providers/cncf/kubernetes/CHANGELOG.rst +++ b/airflow/providers/cncf/kubernetes/CHANGELOG.rst @@ -152,6 +152,12 @@ Bug Fixes * ``Even more typing in operators (template_fields/ext) (#20608)`` * ``Update documentation for provider December 2021 release (#20523)`` +Features +~~~~~~~~ + +* Can now derive kubernetes creds using airflow connection, through KubernetesHook + + 2.2.0 ..... diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index dad59b664d02f..89a069169e2ee 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -26,7 +26,7 @@ from kubernetes.client import CoreV1Api, models as k8s from airflow.exceptions import AirflowException -from airflow.kubernetes import kube_client, pod_generator +from airflow.kubernetes import pod_generator from airflow.kubernetes.pod_generator import PodGenerator from airflow.kubernetes.secret import Secret from airflow.models import BaseOperator @@ -42,6 +42,7 @@ convert_volume, convert_volume_mount, ) +from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook from airflow.providers.cncf.kubernetes.utils import xcom_sidecar from airflow.providers.cncf.kubernetes.utils.pod_manager import PodLaunchFailedException, PodManager, PodPhase from airflow.settings import pod_mutation_hook @@ -136,7 +137,9 @@ class KubernetesPodOperator(BaseOperator): :param priority_class_name: priority class name for the launched Pod :param termination_grace_period: Termination grace period if task killed in UI, defaults to kubernetes default - + :type termination_grace_period: int + :param: kubernetes_conn_id: To retrieve creds for your k8s cluster from an Airflow connection + :type: str """ BASE_CONTAINER_NAME = 'base' @@ -198,12 +201,13 @@ def __init__( pod_runtime_info_envs: Optional[List[k8s.V1EnvVar]] = None, termination_grace_period: Optional[int] = None, configmaps: Optional[List[str]] = None, + kubernetes_conn_id: Optional[str] = None, **kwargs, ) -> None: if kwargs.get('xcom_push') is not None: raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead") super().__init__(resources=None, **kwargs) - + self.kubernetes_conn_id = kubernetes_conn_id self.do_xcom_push = do_xcom_push self.image = image self.namespace = namespace @@ -308,15 +312,17 @@ def pod_manager(self) -> PodManager: return PodManager(kube_client=self.client) @cached_property - def client(self) -> CoreV1Api: - # todo: use airflow Connection / hook to authenticate to the cluster - kwargs: Dict[str, Any] = dict( + def k8s_hook(self): + return KubernetesHook( + conn_id=self.kubernetes_conn_id, cluster_context=self.cluster_context, config_file=self.config_file, + in_cluster=self.in_cluster, ) - if self.in_cluster is not None: - kwargs.update(in_cluster=self.in_cluster) - return kube_client.get_kube_client(**kwargs) + + @cached_property + def client(self) -> CoreV1Api: + return self.k8s_hook.core_v1_client def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]: """Returns an already-running pod for this task instance if one exists."""