From 8376f76aed573c8071c096d9b26c30b1916df0b3 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 16 Mar 2022 12:33:01 -0700 Subject: [PATCH] Remove RefreshConfiguration workaround for K8s token refreshing (#20759) A workaround was added (https://github.com/apache/airflow/pull/5731) to handle the refreshing of EKS tokens. It was necessary because of an upstream bug. It has since been fixed (https://github.com/kubernetes-client/python-base/commit/70b78cd8488068c014b6d762a0c8d358273865b4) and released in v21.7.0 (https://github.com/kubernetes-client/python/blob/master/CHANGELOG.md#v2170). (cherry picked from commit 7bd165fbe2cbbfa8208803ec352c5d16ca2bd3ec) --- UPDATING.md | 6 + airflow/kubernetes/kube_client.py | 47 +- airflow/kubernetes/refresh_config.py | 124 ----- .../providers/cncf/kubernetes/CHANGELOG.rst | 231 +++++++- airflow/providers/cncf/kubernetes/__init__.py | 27 + .../backcompat/backwards_compat_converters.py | 22 +- .../cncf/kubernetes/backcompat/pod.py | 27 +- .../backcompat/pod_runtime_info_env.py | 18 +- .../cncf/kubernetes/backcompat/volume.py | 2 - .../kubernetes/backcompat/volume_mount.py | 4 - .../example_dags/example_kubernetes.py | 163 ++++++ .../cncf/kubernetes/hooks/kubernetes.py | 97 ++-- .../kubernetes/operators/kubernetes_pod.py | 517 ++++++++++-------- .../kubernetes/operators/spark_kubernetes.py | 16 +- .../providers/cncf/kubernetes/provider.yaml | 8 + .../kubernetes/sensors/spark_kubernetes.py | 15 +- .../cncf/kubernetes/utils/pod_manager.py | 370 +++++++++++++ setup.py | 2 +- tests/kubernetes/test_client.py | 22 +- tests/kubernetes/test_refresh_config.py | 37 -- 20 files changed, 1220 insertions(+), 535 deletions(-) delete mode 100644 airflow/kubernetes/refresh_config.py create mode 100644 airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py create mode 100644 airflow/providers/cncf/kubernetes/utils/pod_manager.py delete mode 100644 tests/kubernetes/test_refresh_config.py diff --git a/UPDATING.md b/UPDATING.md index cd1151e30c511c..2685afcb58db2f 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -84,6 +84,12 @@ https://developers.google.com/style/inclusive-documentation ## Airflow 2.2.5 +### Minimum kubernetes version bumped from 3.0.0 to 21.7.0 + +No change in behavior is expected. This was necessary in order to take advantage of a [bugfix](https://github.com/kubernetes-client/python-base/commit/70b78cd8488068c014b6d762a0c8d358273865b4) concerning refreshing of Kubernetes API tokens with EKS, which enabled the removal of some [workaround code](https://github.com/apache/airflow/pull/20759). + +### Deprecation: `Connection.extra` must be JSON-encoded dict + No breaking changes. ## Airflow 2.2.4 diff --git a/airflow/kubernetes/kube_client.py b/airflow/kubernetes/kube_client.py index 1c20bd3b93a74d..aa497158a04ab9 100644 --- a/airflow/kubernetes/kube_client.py +++ b/airflow/kubernetes/kube_client.py @@ -25,39 +25,10 @@ try: from kubernetes import client, config from kubernetes.client import Configuration - from kubernetes.client.api_client import ApiClient from kubernetes.client.rest import ApiException - from airflow.kubernetes.refresh_config import RefreshConfiguration, load_kube_config - has_kubernetes = True - def _get_kube_config( - in_cluster: bool, cluster_context: Optional[str], config_file: Optional[str] - ) -> Optional[Configuration]: - if in_cluster: - # load_incluster_config set default configuration with config populated by k8s - config.load_incluster_config() - return None - else: - # this block can be replaced with just config.load_kube_config once - # refresh_config module is replaced with upstream fix - cfg = RefreshConfiguration() - load_kube_config(client_configuration=cfg, config_file=config_file, context=cluster_context) - return cfg - - def _get_client_with_patched_configuration(cfg: Optional[Configuration]) -> client.CoreV1Api: - """ - This is a workaround for supporting api token refresh in k8s client. - - The function can be replace with `return client.CoreV1Api()` once the - upstream client supports token refresh. - """ - if cfg: - return client.CoreV1Api(api_client=ApiClient(configuration=cfg)) - else: - return client.CoreV1Api() - def _disable_verify_ssl() -> None: configuration = Configuration() configuration.verify_ssl = False @@ -130,17 +101,19 @@ def get_kube_client( if not has_kubernetes: raise _import_err - if not in_cluster: - if cluster_context is None: - cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None) - if config_file is None: - config_file = conf.get('kubernetes', 'config_file', fallback=None) - if conf.getboolean('kubernetes', 'enable_tcp_keepalive'): _enable_tcp_keepalive() if not conf.getboolean('kubernetes', 'verify_ssl'): _disable_verify_ssl() - client_conf = _get_kube_config(in_cluster, cluster_context, config_file) - return _get_client_with_patched_configuration(client_conf) + if in_cluster: + config.load_incluster_config() + else: + if cluster_context is None: + cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None) + if config_file is None: + config_file = conf.get('kubernetes', 'config_file', fallback=None) + config.load_kube_config(config_file=config_file, context=cluster_context) + + return client.CoreV1Api() diff --git a/airflow/kubernetes/refresh_config.py b/airflow/kubernetes/refresh_config.py deleted file mode 100644 index 25649510ce0d29..00000000000000 --- a/airflow/kubernetes/refresh_config.py +++ /dev/null @@ -1,124 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -""" -NOTE: this module can be removed once upstream client supports token refresh -see: https://github.com/kubernetes-client/python/issues/741 -""" - -import calendar -import logging -import os -import time -from typing import Optional, cast - -import pendulum -from kubernetes.client import Configuration -from kubernetes.config.exec_provider import ExecProvider -from kubernetes.config.kube_config import KUBE_CONFIG_DEFAULT_LOCATION, KubeConfigLoader - -from airflow.utils import yaml - - -def _parse_timestamp(ts_str: str) -> int: - parsed_dt = cast(pendulum.DateTime, pendulum.parse(ts_str)) - return calendar.timegm(parsed_dt.timetuple()) - - -class RefreshKubeConfigLoader(KubeConfigLoader): - """ - Patched KubeConfigLoader, this subclass takes expirationTimestamp into - account and sets api key refresh callback hook in Configuration object - """ - - def __init__(self, *args, **kwargs): - KubeConfigLoader.__init__(self, *args, **kwargs) - self.api_key_expire_ts = None - - def _load_from_exec_plugin(self): - """ - We override _load_from_exec_plugin method to also read and store - expiration timestamp for aws-iam-authenticator. It will be later - used for api token refresh. - """ - if 'exec' not in self._user: - return None - try: - status = ExecProvider(self._user['exec']).run() - if 'token' not in status: - logging.error('exec: missing token field in plugin output') - return None - self.token = f"Bearer {status['token']}" - ts_str = status.get('expirationTimestamp') - if ts_str: - self.api_key_expire_ts = _parse_timestamp(ts_str) - return True - except Exception as e: - logging.error(str(e)) - return None - - def refresh_api_key(self, client_configuration): - """Refresh API key if expired""" - if self.api_key_expire_ts and time.time() >= self.api_key_expire_ts: - self.load_and_set(client_configuration) - - def load_and_set(self, client_configuration): - KubeConfigLoader.load_and_set(self, client_configuration) - client_configuration.refresh_api_key = self.refresh_api_key - - -class RefreshConfiguration(Configuration): - """ - Patched Configuration, this subclass takes api key refresh callback hook - into account - """ - - def __init__(self, *args, **kwargs): - Configuration.__init__(self, *args, **kwargs) - self.refresh_api_key = None - - def get_api_key_with_prefix(self, identifier): - if self.refresh_api_key: - self.refresh_api_key(self) - return Configuration.get_api_key_with_prefix(self, identifier) - - -def _get_kube_config_loader_for_yaml_file(filename, **kwargs) -> Optional[RefreshKubeConfigLoader]: - """ - Adapted from the upstream _get_kube_config_loader_for_yaml_file function, changed - KubeConfigLoader to RefreshKubeConfigLoader - """ - with open(filename) as f: - return RefreshKubeConfigLoader( - config_dict=yaml.safe_load(f), - config_base_path=os.path.abspath(os.path.dirname(filename)), - **kwargs, - ) - - -def load_kube_config(client_configuration, config_file=None, context=None): - """ - Adapted from the upstream load_kube_config function, changes: - - removed persist_config argument since it's not being used - - remove `client_configuration is None` branch since we always pass - in client configuration - """ - if config_file is None: - config_file = os.path.expanduser(KUBE_CONFIG_DEFAULT_LOCATION) - - loader = _get_kube_config_loader_for_yaml_file(config_file, active_context=context, config_persister=None) - loader.load_and_set(client_configuration) diff --git a/airflow/providers/cncf/kubernetes/CHANGELOG.rst b/airflow/providers/cncf/kubernetes/CHANGELOG.rst index 7f686b22f68a4f..5bcf56941f55ed 100644 --- a/airflow/providers/cncf/kubernetes/CHANGELOG.rst +++ b/airflow/providers/cncf/kubernetes/CHANGELOG.rst @@ -19,6 +19,231 @@ Changelog --------- +3.1.2 +..... + +Bug Fixes +~~~~~~~~~ + +* ``Fix mistakenly added install_requires for all providers (#22382)`` +* ``Fix "run_id" k8s and elasticsearch compatibility with Airflow 2.1 (#22385)`` + +Misc +~~~~ + +* ``Remove RefreshConfiguration workaround for K8s token refreshing (#20759)`` + +3.1.1 +..... + +Misc +~~~~~ + +* ``Add Trove classifiers in PyPI (Framework :: Apache Airflow :: Provider)`` + +3.1.0 +..... + +Features +~~~~~~~~ + +* ``Add map_index label to mapped KubernetesPodOperator (#21916)`` +* ``Change KubePodOperator labels from exeuction_date to run_id (#21960)`` + +Misc +~~~~ + +* ``Support for Python 3.10`` +* ``Fix Kubernetes example with wrong operator casing (#21898)`` +* ``Remove types from KPO docstring (#21826)`` + +.. Below changes are excluded from the changelog. Move them to + appropriate section above if needed. Do not delete the lines(!): + * ``Add pre-commit check for docstring param types (#21398)`` + +3.0.2 +..... + +Bug Fixes +~~~~~~~~~ + +* ``Add missed deprecations for cncf (#20031)`` + +.. Below changes are excluded from the changelog. Move them to + appropriate section above if needed. Do not delete the lines(!): + * ``Remove ':type' lines now sphinx-autoapi supports typehints (#20951)`` + * ``Make ''delete_pod'' change more prominent in K8s changelog (#20753)`` + * ``Fix MyPy Errors for providers: Tableau, CNCF, Apache (#20654)`` + * ``Add optional features in providers. (#21074)`` + * ``Add documentation for January 2021 providers release (#21257)`` + +3.0.1 +..... + + +Misc +~~~~ + +* ``Update Kubernetes library version (#18797)`` + +.. Below changes are excluded from the changelog. Move them to + appropriate section above if needed. Do not delete the lines(!): + +3.0.0 +..... + +Breaking changes +~~~~~~~~~~~~~~~~ + +* ``Parameter is_delete_operator_pod default is changed to True (#20575)`` +* ``Simplify KubernetesPodOperator (#19572)`` +* ``Move pod_mutation_hook call from PodManager to KubernetesPodOperator (#20596)`` +* ``Rename ''PodLauncher'' to ''PodManager'' (#20576)`` + +Parameter is_delete_operator_pod has new default +```````````````````````````````````````````````` + +Previously, the default for param ``is_delete_operator_pod`` was ``False``, which means that +after a task runs, its pod is not deleted by the operator and remains on the +cluster indefinitely. With this release, we change the default to ``True``. + +Notes on changes KubernetesPodOperator and PodLauncher +`````````````````````````````````````````````````````` + +.. warning:: Many methods in ``KubernetesPodOperator`` and ``PodLauncher`` have been renamed. + If you have subclassed ``KubernetesPodOperator`` you will need to update your subclass to reflect + the new structure. Additionally ``PodStatus`` enum has been renamed to ``PodPhase``. + +Overview +'''''''' + +Generally speaking if you did not subclass ``KubernetesPodOperator`` and you didn't use the ``PodLauncher`` class directly, +then you don't need to worry about this change. If however you have subclassed ``KubernetesPodOperator``, what +follows are some notes on the changes in this release. + +One of the principal goals of the refactor is to clearly separate the "get or create pod" and +"wait for pod completion" phases. Previously the "wait for pod completion" logic would be invoked +differently depending on whether the operator were to "attach to an existing pod" (e.g. after a +worker failure) or "create a new pod" and this resulted in some code duplication and a bit more +nesting of logic. With this refactor we encapsulate the "get or create" step +into method ``KubernetesPodOperator.get_or_create_pod``, and pull the monitoring and XCom logic up +into the top level of ``execute`` because it can be the same for "attached" pods and "new" pods. + +The ``KubernetesPodOperator.get_or_create_pod`` tries first to find an existing pod using labels +specific to the task instance (see ``KubernetesPodOperator.find_pod``). +If one does not exist it ``creates a pod <~.PodManager.create_pod>``. + +The "waiting" part of execution has three components. The first step is to wait for the pod to leave the +``Pending`` phase (``~.KubernetesPodOperator.await_pod_start``). Next, if configured to do so, +the operator will follow the base container logs and forward these logs to the task logger until +the ``base`` container is done. If not configured to harvest the +logs, the operator will instead ``KubernetesPodOperator.await_container_completion`` +either way, we must await container completion before harvesting xcom. After (optionally) extracting the xcom +value from the base container, we ``await pod completion <~.PodManager.await_pod_completion>``. + +Previously, depending on whether the pod was "reattached to" (e.g. after a worker failure) or +created anew, the waiting logic may have occurred in either ``handle_pod_overlap`` or ``create_new_pod_for_operator``. + +After the pod terminates, we execute different cleanup tasks depending on whether the pod terminated successfully. + +If the pod terminates *unsuccessfully*, we attempt to log the pod events ``PodLauncher.read_pod_events>``. If +additionally the task is configured *not* to delete the pod after termination, we apply a label ``KubernetesPodOperator.patch_already_checked>`` +indicating that the pod failed and should not be "reattached to" in a retry. If the task is configured +to delete its pod, we delete it ``KubernetesPodOperator.process_pod_deletion>``. Finally, +we raise an AirflowException to fail the task instance. + +If the pod terminates successfully, we delete the pod ``KubernetesPodOperator.process_pod_deletion>`` +(if configured to delete the pod) and push XCom (if configured to push XCom). + +Details on method renames, refactors, and deletions +''''''''''''''''''''''''''''''''''''''''''''''''''' + +In ``KubernetesPodOperator``: + +* Method ``create_pod_launcher`` is converted to cached property ``pod_manager`` +* Construction of k8s ``CoreV1Api`` client is now encapsulated within cached property ``client`` +* Logic to search for an existing pod (e.g. after an airflow worker failure) is moved out of ``execute`` and into method ``find_pod``. +* Method ``handle_pod_overlap`` is removed. Previously it monitored a "found" pod until completion. With this change the pod monitoring (and log following) is orchestrated directly from ``execute`` and it is the same whether it's a "found" pod or a "new" pod. See methods ``await_pod_start``, ``follow_container_logs``, ``await_container_completion`` and ``await_pod_completion``. +* Method ``create_pod_request_obj`` is renamed ``build_pod_request_obj``. It now takes argument ``context`` in order to add TI-specific pod labels; previously they were added after return. +* Method ``create_labels_for_pod`` is renamed ``_get_ti_pod_labels``. This method doesn't return *all* labels, but only those specific to the TI. We also add parameter ``include_try_number`` to control the inclusion of this label instead of possibly filtering it out later. +* Method ``_get_pod_identifying_label_string`` is renamed ``_build_find_pod_label_selector`` +* Method ``_try_numbers_match`` is removed. +* Method ``create_new_pod_for_operator`` is removed. Previously it would mutate the labels on ``self.pod``, launch the pod, monitor the pod to completion etc. Now this logic is in part handled by ``get_or_create_pod``, where a new pod will be created if necessary. The monitoring etc is now orchestrated directly from ``execute``. Again, see the calls to methods ``await_pod_start``, ``follow_container_logs``, ``await_container_completion`` and ``await_pod_completion``. + +In class ``PodManager`` (formerly ``PodLauncher``): + +* Method ``start_pod`` is removed and split into two methods: ``create_pod`` and ``await_pod_start``. +* Method ``monitor_pod`` is removed and split into methods ``follow_container_logs``, ``await_container_completion``, ``await_pod_completion`` +* Methods ``pod_not_started``, ``pod_is_running``, ``process_status``, and ``_task_status`` are removed. These were needed due to the way in which pod ``phase`` was mapped to task instance states; but we no longer do such a mapping and instead deal with pod phases directly and untransformed. +* Method ``_extract_xcom`` is renamed ``extract_xcom``. +* Method ``read_pod_logs`` now takes kwarg ``container_name`` + + +Other changes in ``pod_manager.py`` (formerly ``pod_launcher.py``): + +* Class ``pod_launcher.PodLauncher`` renamed to ``pod_manager.PodManager`` +* Enum-like class ``PodStatus`` is renamed ``PodPhase``, and the values are no longer lower-cased. +* The ``airflow.settings.pod_mutation_hook`` is no longer called in + ``cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async``. For ``KubernetesPodOperator``, + mutation now occurs in ``build_pod_request_obj``. +* Parameter ``is_delete_operator_pod`` default is changed to ``True`` so that pods are deleted after task + completion and not left to accumulate. In practice it seems more common to disable pod deletion only on a + temporary basis for debugging purposes and therefore pod deletion is the more sensible default. + +Features +~~~~~~~~ + +* ``Add params config, in_cluster, and cluster_context to KubernetesHook (#19695)`` +* ``Implement dry_run for KubernetesPodOperator (#20573)`` +* ``Clarify docstring for ''build_pod_request_obj'' in K8s providers (#20574)`` + +Bug Fixes +~~~~~~~~~ + +* ``Fix Volume/VolumeMount KPO DeprecationWarning (#19726)`` + +.. Below changes are excluded from the changelog. Move them to + appropriate section above if needed. Do not delete the lines(!): + * ``Fix cached_property MyPy declaration and related MyPy errors (#20226)`` + * ``Use typed Context EVERYWHERE (#20565)`` + * ``Fix template_fields type to have MyPy friendly Sequence type (#20571)`` + * ``Even more typing in operators (template_fields/ext) (#20608)`` + * ``Update documentation for provider December 2021 release (#20523)`` + +2.2.0 +..... + +Features +~~~~~~~~ + +* ``Added namespace as a template field in the KPO. (#19718)`` +* ``Decouple name randomization from name kwarg (#19398)`` + +Bug Fixes +~~~~~~~~~ + +* ``Checking event.status.container_statuses before filtering (#19713)`` +* ``Coalesce 'extra' params to None in KubernetesHook (#19694)`` +* ``Change to correct type in KubernetesPodOperator (#19459)`` + +.. Below changes are excluded from the changelog. Move them to + appropriate section above if needed. Do not delete the lines(!): + * ``Fix duplicate changelog entries (#19759)`` + +2.1.0 +..... + +Features +~~~~~~~~ + +* ``Add more type hints to PodLauncher (#18928)`` +* ``Add more information to PodLauncher timeout error (#17953)`` + +.. Below changes are excluded from the changelog. Move them to + appropriate section above if needed. Do not delete the lines(!): + * ``Update docstring to let users use 'node_selector' (#19057)`` + * ``Add pre-commit hook for common misspelling check in files (#18964)`` + 2.0.3 ..... @@ -44,7 +269,8 @@ Bug Fixes * ``Fix using XCom with ''KubernetesPodOperator'' (#17760)`` * ``Import Hooks lazily individually in providers manager (#17682)`` -.. Review and move the new changes to one of the sections above: +.. Below changes are excluded from the changelog. Move them to + appropriate section above if needed. Do not delete the lines(!): * ``Fix messed-up changelog in 3 providers (#17380)`` * ``Fix static checks (#17256)`` * ``Update spark_kubernetes.py (#17237)`` @@ -65,10 +291,7 @@ Bug Fixes .. Below changes are excluded from the changelog. Move them to appropriate section above if needed. Do not delete the lines(!): - * ``Fixed wrongly escaped characters in amazon's changelog (#17020)`` * ``Simplify 'default_args' in Kubernetes example DAGs (#16870)`` - * ``Enable using custom pod launcher in Kubernetes Pod Operator (#16945)`` - * ``Prepare documentation for July release of providers. (#17015)`` * ``Updating task dependencies (#16624)`` * ``Removes pylint from our toolchain (#16682)`` * ``Prepare documentation for July release of providers. (#17015)`` diff --git a/airflow/providers/cncf/kubernetes/__init__.py b/airflow/providers/cncf/kubernetes/__init__.py index 217e5db9607827..0998e31143fc8e 100644 --- a/airflow/providers/cncf/kubernetes/__init__.py +++ b/airflow/providers/cncf/kubernetes/__init__.py @@ -15,3 +15,30 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import sys + +if sys.version_info < (3, 7): + # This is needed because the Python Kubernetes client >= 12.0 contains a logging object, meaning that + # v1.Pod et al. are not pickleable on Python 3.6. + + # Python 3.7 added this via https://bugs.python.org/issue30520 in 2017 -- but Python 3.6 doesn't have this + # method. + + # This is duplicated/backported from airflow.logging_config in 2.2, but by having it here as well it means + # that we can update the version used in this provider and have it work for older versions + import copyreg + import logging + + def _reduce_Logger(logger): + if logging.getLogger(logger.name) is not logger: + import pickle + + raise pickle.PicklingError('logger cannot be pickled') + return logging.getLogger, (logger.name,) + + def _reduce_RootLogger(logger): + return logging.getLogger, () + + if logging.Logger not in copyreg.dispatch_table: + copyreg.pickle(logging.Logger, _reduce_Logger) + copyreg.pickle(logging.RootLogger, _reduce_RootLogger) diff --git a/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py b/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py index 4c6404f0548324..bf2b8329f8ccdc 100644 --- a/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py +++ b/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py @@ -21,18 +21,16 @@ from kubernetes.client import ApiClient, models as k8s from airflow.exceptions import AirflowException -from airflow.providers.cncf.kubernetes.backcompat.pod import Port, Resources -from airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env import PodRuntimeInfoEnv -def _convert_kube_model_object(obj, old_class, new_class): +def _convert_kube_model_object(obj, new_class): convert_op = getattr(obj, "to_k8s_client_obj", None) if callable(convert_op): return obj.to_k8s_client_obj() elif isinstance(obj, new_class): return obj else: - raise AirflowException(f"Expected {old_class} or {new_class}, got {type(obj)}") + raise AirflowException(f"Expected {new_class}, got {type(obj)}") def _convert_from_dict(obj, new_class): @@ -52,9 +50,7 @@ def convert_volume(volume) -> k8s.V1Volume: :param volume: :return: k8s.V1Volume """ - from airflow.providers.cncf.kubernetes.backcompat.volume import Volume - - return _convert_kube_model_object(volume, Volume, k8s.V1Volume) + return _convert_kube_model_object(volume, k8s.V1Volume) def convert_volume_mount(volume_mount) -> k8s.V1VolumeMount: @@ -64,9 +60,7 @@ def convert_volume_mount(volume_mount) -> k8s.V1VolumeMount: :param volume_mount: :return: k8s.V1VolumeMount """ - from airflow.providers.cncf.kubernetes.backcompat.volume_mount import VolumeMount - - return _convert_kube_model_object(volume_mount, VolumeMount, k8s.V1VolumeMount) + return _convert_kube_model_object(volume_mount, k8s.V1VolumeMount) def convert_resources(resources) -> k8s.V1ResourceRequirements: @@ -77,8 +71,10 @@ def convert_resources(resources) -> k8s.V1ResourceRequirements: :return: k8s.V1ResourceRequirements """ if isinstance(resources, dict): + from airflow.providers.cncf.kubernetes.backcompat.pod import Resources + resources = Resources(**resources) - return _convert_kube_model_object(resources, Resources, k8s.V1ResourceRequirements) + return _convert_kube_model_object(resources, k8s.V1ResourceRequirements) def convert_port(port) -> k8s.V1ContainerPort: @@ -88,7 +84,7 @@ def convert_port(port) -> k8s.V1ContainerPort: :param port: :return: k8s.V1ContainerPort """ - return _convert_kube_model_object(port, Port, k8s.V1ContainerPort) + return _convert_kube_model_object(port, k8s.V1ContainerPort) def convert_env_vars(env_vars) -> List[k8s.V1EnvVar]: @@ -116,7 +112,7 @@ def convert_pod_runtime_info_env(pod_runtime_info_envs) -> k8s.V1EnvVar: :param pod_runtime_info_envs: :return: """ - return _convert_kube_model_object(pod_runtime_info_envs, PodRuntimeInfoEnv, k8s.V1EnvVar) + return _convert_kube_model_object(pod_runtime_info_envs, k8s.V1EnvVar) def convert_image_pull_secrets(image_pull_secrets) -> List[k8s.V1LocalObjectReference]: diff --git a/airflow/providers/cncf/kubernetes/backcompat/pod.py b/airflow/providers/cncf/kubernetes/backcompat/pod.py index 30a7128b399e8e..7f18117e18bfa2 100644 --- a/airflow/providers/cncf/kubernetes/backcompat/pod.py +++ b/airflow/providers/cncf/kubernetes/backcompat/pod.py @@ -14,13 +14,29 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Classes for interacting with Kubernetes API""" +""" +Classes for interacting with Kubernetes API. + +This module is deprecated. Please use :mod:`kubernetes.client.models.V1ResourceRequirements` +and :mod:`kubernetes.client.models.V1ContainerPort`. +""" + +import warnings from kubernetes.client import models as k8s +warnings.warn( + ( + "This module is deprecated. Please use `kubernetes.client.models.V1ResourceRequirements`" + " and `kubernetes.client.models.V1ContainerPort`." + ), + DeprecationWarning, + stacklevel=2, +) + class Resources: - """backwards compat for Resources""" + """backwards compat for Resources.""" __slots__ = ( 'request_memory', @@ -34,19 +50,12 @@ class Resources: """ :param request_memory: requested memory - :type request_memory: str :param request_cpu: requested CPU number - :type request_cpu: float | str :param request_ephemeral_storage: requested ephemeral storage - :type request_ephemeral_storage: str :param limit_memory: limit for memory usage - :type limit_memory: str :param limit_cpu: Limit for CPU used - :type limit_cpu: float | str :param limit_gpu: Limits for GPU used - :type limit_gpu: int :param limit_ephemeral_storage: Limit for ephemeral storage - :type limit_ephemeral_storage: float | str """ def __init__( diff --git a/airflow/providers/cncf/kubernetes/backcompat/pod_runtime_info_env.py b/airflow/providers/cncf/kubernetes/backcompat/pod_runtime_info_env.py index f76e0d7d0bb213..f08aecff33a89c 100644 --- a/airflow/providers/cncf/kubernetes/backcompat/pod_runtime_info_env.py +++ b/airflow/providers/cncf/kubernetes/backcompat/pod_runtime_info_env.py @@ -14,13 +14,25 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Classes for interacting with Kubernetes API""" +""" +Classes for interacting with Kubernetes API. + +This module is deprecated. Please use :mod:`kubernetes.client.models.V1EnvVar`. +""" + +import warnings import kubernetes.client.models as k8s +warnings.warn( + "This module is deprecated. Please use `kubernetes.client.models.V1EnvVar`.", + DeprecationWarning, + stacklevel=2, +) + class PodRuntimeInfoEnv: - """Defines Pod runtime information as environment variable""" + """Defines Pod runtime information as environment variable.""" def __init__(self, name, field_path): """ @@ -28,9 +40,7 @@ def __init__(self, name, field_path): Full list of options can be found in kubernetes documentation. :param name: the name of the environment variable - :type: name: str :param field_path: path to pod runtime info. Ex: metadata.namespace | status.podIP - :type: field_path: str """ self.name = name self.field_path = field_path diff --git a/airflow/providers/cncf/kubernetes/backcompat/volume.py b/airflow/providers/cncf/kubernetes/backcompat/volume.py index e5b4d004ed0ca9..c51ce8a551e384 100644 --- a/airflow/providers/cncf/kubernetes/backcompat/volume.py +++ b/airflow/providers/cncf/kubernetes/backcompat/volume.py @@ -35,10 +35,8 @@ def __init__(self, name, configs): and Persistent Volumes :param name: the name of the volume mount - :type name: str :param configs: dictionary of any features needed for volume. We purposely keep this vague since there are multiple volume types with changing configs. - :type configs: dict """ self.name = name self.configs = configs diff --git a/airflow/providers/cncf/kubernetes/backcompat/volume_mount.py b/airflow/providers/cncf/kubernetes/backcompat/volume_mount.py index b77ab47cd8a3a6..f9faed9d04a974 100644 --- a/airflow/providers/cncf/kubernetes/backcompat/volume_mount.py +++ b/airflow/providers/cncf/kubernetes/backcompat/volume_mount.py @@ -38,13 +38,9 @@ def __init__(self, name, mount_path, sub_path, read_only): running container. :param name: the name of the volume mount - :type name: str :param mount_path: - :type mount_path: str :param sub_path: subpath within the volume mount - :type sub_path: Optional[str] :param read_only: whether to access pod with read-only mode - :type read_only: bool """ self.name = name self.mount_path = mount_path diff --git a/airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py b/airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py new file mode 100644 index 00000000000000..b65dae9f4e52af --- /dev/null +++ b/airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py @@ -0,0 +1,163 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +This is an example dag for using the KubernetesPodOperator. +""" + +from datetime import datetime + +from kubernetes.client import models as k8s + +from airflow import DAG +from airflow.kubernetes.secret import Secret +from airflow.operators.bash import BashOperator +from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator + +# [START howto_operator_k8s_cluster_resources] +secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn') +secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn') +secret_all_keys = Secret('env', None, 'airflow-secrets-2') +volume_mount = k8s.V1VolumeMount( + name='test-volume', mount_path='/root/mount_file', sub_path=None, read_only=True +) + +configmaps = [ + k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='test-configmap-1')), + k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='test-configmap-2')), +] + +volume = k8s.V1Volume( + name='test-volume', + persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'), +) + +port = k8s.V1ContainerPort(name='http', container_port=80) + +init_container_volume_mounts = [ + k8s.V1VolumeMount(mount_path='/etc/foo', name='test-volume', sub_path=None, read_only=True) +] + +init_environments = [k8s.V1EnvVar(name='key1', value='value1'), k8s.V1EnvVar(name='key2', value='value2')] + +init_container = k8s.V1Container( + name="init-container", + image="ubuntu:16.04", + env=init_environments, + volume_mounts=init_container_volume_mounts, + command=["bash", "-cx"], + args=["echo 10"], +) + +affinity = k8s.V1Affinity( + node_affinity=k8s.V1NodeAffinity( + preferred_during_scheduling_ignored_during_execution=[ + k8s.V1PreferredSchedulingTerm( + weight=1, + preference=k8s.V1NodeSelectorTerm( + match_expressions=[ + k8s.V1NodeSelectorRequirement(key="disktype", operator="In", values=["ssd"]) + ] + ), + ) + ] + ), + pod_affinity=k8s.V1PodAffinity( + required_during_scheduling_ignored_during_execution=[ + k8s.V1WeightedPodAffinityTerm( + weight=1, + pod_affinity_term=k8s.V1PodAffinityTerm( + label_selector=k8s.V1LabelSelector( + match_expressions=[ + k8s.V1LabelSelectorRequirement(key="security", operator="In", values="S1") + ] + ), + topology_key="failure-domain.beta.kubernetes.io/zone", + ), + ) + ] + ), +) + +tolerations = [k8s.V1Toleration(key="key", operator="Equal", value="value")] + +# [END howto_operator_k8s_cluster_resources] + + +with DAG( + dag_id='example_kubernetes_operator', + schedule_interval=None, + start_date=datetime(2021, 1, 1), + tags=['example'], +) as dag: + k = KubernetesPodOperator( + namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo", "10"], + labels={"foo": "bar"}, + secrets=[secret_file, secret_env, secret_all_keys], + ports=[port], + volumes=[volume], + volume_mounts=[volume_mount], + env_from=configmaps, + name="airflow-test-pod", + task_id="task", + affinity=affinity, + is_delete_operator_pod=True, + hostnetwork=False, + tolerations=tolerations, + init_containers=[init_container], + priority_class_name="medium", + ) + + # [START howto_operator_k8s_private_image] + quay_k8s = KubernetesPodOperator( + namespace='default', + image='quay.io/apache/bash', + image_pull_secrets=[k8s.V1LocalObjectReference('testquay')], + cmds=["bash", "-cx"], + arguments=["echo", "10", "echo pwd"], + labels={"foo": "bar"}, + name="airflow-private-image-pod", + is_delete_operator_pod=True, + in_cluster=True, + task_id="task-two", + get_logs=True, + ) + # [END howto_operator_k8s_private_image] + + # [START howto_operator_k8s_write_xcom] + write_xcom = KubernetesPodOperator( + namespace='default', + image='alpine', + cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"], + name="write-xcom", + do_xcom_push=True, + is_delete_operator_pod=True, + in_cluster=True, + task_id="write-xcom", + get_logs=True, + ) + + pod_task_xcom_result = BashOperator( + bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"", + task_id="pod_task_xcom_result", + ) + # [END howto_operator_k8s_write_xcom] + + write_xcom >> pod_task_xcom_result diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index e230dba34b017c..38305031dd0222 100644 --- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -14,19 +14,21 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import sys import tempfile from typing import Any, Dict, Generator, Optional, Tuple, Union -try: +if sys.version_info >= (3, 8): from functools import cached_property -except ImportError: +else: from cached_property import cached_property + from kubernetes import client, config, watch try: import airflow.utils.yaml as yaml except ImportError: - import yaml + import yaml # type: ignore[no-redef] from airflow.exceptions import AirflowException from airflow.hooks.base import BaseHook @@ -59,7 +61,6 @@ class KubernetesHook(BaseHook): :param conn_id: The :ref:`kubernetes connection ` to Kubernetes cluster. - :type conn_id: str """ conn_name_attr = 'kubernetes_conn_id' @@ -85,10 +86,13 @@ def get_connection_form_widgets() -> Dict[str, Any]: "extra__kubernetes__namespace": StringField( lazy_gettext('Namespace'), widget=BS3TextFieldWidget() ), + "extra__kubernetes__cluster_context": StringField( + lazy_gettext('Cluster context'), widget=BS3TextFieldWidget() + ), } @staticmethod - def get_ui_field_behaviour() -> Dict: + def get_ui_field_behaviour() -> Dict[str, Any]: """Returns custom field behaviour""" return { "hidden_fields": ['host', 'schema', 'login', 'password', 'port', 'extra'], @@ -96,25 +100,49 @@ def get_ui_field_behaviour() -> Dict: } def __init__( - self, conn_id: str = default_conn_name, client_configuration: Optional[client.Configuration] = None + self, + conn_id: Optional[str] = default_conn_name, + client_configuration: Optional[client.Configuration] = None, + cluster_context: Optional[str] = None, + config_file: Optional[str] = None, + in_cluster: Optional[bool] = None, ) -> None: super().__init__() self.conn_id = conn_id self.client_configuration = client_configuration + self.cluster_context = cluster_context + self.config_file = config_file + self.in_cluster = in_cluster + + @staticmethod + def _coalesce_param(*params): + for param in params: + if param is not None: + return param def get_conn(self) -> Any: """Returns kubernetes api session for use with requests""" - connection = self.get_connection(self.conn_id) - extras = connection.extra_dejson - in_cluster = extras.get("extra__kubernetes__in_cluster") - kubeconfig_path = extras.get("extra__kubernetes__kube_config_path") - kubeconfig = extras.get("extra__kubernetes__kube_config") + if self.conn_id: + connection = self.get_connection(self.conn_id) + extras = connection.extra_dejson + else: + extras = {} + in_cluster = self._coalesce_param( + self.in_cluster, extras.get("extra__kubernetes__in_cluster") or None + ) + cluster_context = self._coalesce_param( + self.cluster_context, extras.get("extra__kubernetes__cluster_context") or None + ) + kubeconfig_path = self._coalesce_param( + self.config_file, extras.get("extra__kubernetes__kube_config_path") or None + ) + kubeconfig = extras.get("extra__kubernetes__kube_config") or None num_selected_configuration = len([o for o in [in_cluster, kubeconfig, kubeconfig_path] if o]) if num_selected_configuration > 1: raise AirflowException( - "Invalid connection configuration. Options extra__kubernetes__kube_config_path, " - "extra__kubernetes__kube_config, extra__kubernetes__in_cluster are mutually exclusive. " + "Invalid connection configuration. Options kube_config_path, " + "kube_config, in_cluster are mutually exclusive. " "You can only use one option at a time." ) if in_cluster: @@ -125,7 +153,9 @@ def get_conn(self) -> Any: if kubeconfig_path is not None: self.log.debug("loading kube_config from: %s", kubeconfig_path) config.load_kube_config( - config_file=kubeconfig_path, client_configuration=self.client_configuration + config_file=kubeconfig_path, + client_configuration=self.client_configuration, + context=cluster_context, ) return client.ApiClient() @@ -135,12 +165,17 @@ def get_conn(self) -> Any: temp_config.write(kubeconfig.encode()) temp_config.flush() config.load_kube_config( - config_file=temp_config.name, client_configuration=self.client_configuration + config_file=temp_config.name, + client_configuration=self.client_configuration, + context=cluster_context, ) return client.ApiClient() self.log.debug("loading kube_config from: default file") - config.load_kube_config(client_configuration=self.client_configuration) + config.load_kube_config( + client_configuration=self.client_configuration, + context=cluster_context, + ) return client.ApiClient() @cached_property @@ -148,6 +183,10 @@ def api_client(self) -> Any: """Cached Kubernetes API client""" return self.get_conn() + @cached_property + def core_v1_client(self): + return client.CoreV1Api(api_client=self.api_client) + def create_custom_object( self, group: str, version: str, plural: str, body: Union[str, dict], namespace: Optional[str] = None ): @@ -155,15 +194,10 @@ def create_custom_object( Creates custom resource definition object in Kubernetes :param group: api group - :type group: str :param version: api version - :type version: str :param plural: api plural - :type plural: str :param body: crd object definition - :type body: Union[str, dict] :param namespace: kubernetes namespace - :type namespace: str """ api = client.CustomObjectsApi(self.api_client) if namespace is None: @@ -186,15 +220,10 @@ def get_custom_object( Get custom resource definition object from Kubernetes :param group: api group - :type group: str :param version: api version - :type version: str :param plural: api plural - :type plural: str :param name: crd object name - :type name: str :param namespace: kubernetes namespace - :type namespace: str """ api = client.CustomObjectsApi(self.api_client) if namespace is None: @@ -207,12 +236,14 @@ def get_custom_object( except client.rest.ApiException as e: raise AirflowException(f"Exception when calling -> get_custom_object: {e}\n") - def get_namespace(self) -> str: + def get_namespace(self) -> Optional[str]: """Returns the namespace that defined in the connection""" - connection = self.get_connection(self.conn_id) - extras = connection.extra_dejson - namespace = extras.get("extra__kubernetes__namespace", "default") - return namespace + if self.conn_id: + connection = self.get_connection(self.conn_id) + extras = connection.extra_dejson + namespace = extras.get("extra__kubernetes__namespace", "default") + return namespace + return None def get_pod_log_stream( self, @@ -224,10 +255,8 @@ def get_pod_log_stream( Retrieves a log stream for a container in a kubernetes pod. :param pod_name: pod name - :type pod_name: str :param container: container name :param namespace: kubernetes namespace - :type namespace: str """ api = client.CoreV1Api(self.api_client) watcher = watch.Watch() @@ -251,10 +280,8 @@ def get_pod_logs( Retrieves a container's log from the specified pod. :param pod_name: pod name - :type pod_name: str :param container: container name :param namespace: kubernetes namespace - :type namespace: str """ api = client.CoreV1Api(self.api_client) return api.read_namespaced_pod_log( diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index 747f8b024e6675..dd127fee76ee1a 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -15,17 +15,16 @@ # specific language governing permissions and limitations # under the License. """Executes task in a Kubernetes POD""" +import json +import logging import re +import sys import warnings -from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Type +from contextlib import AbstractContextManager +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence from kubernetes.client import CoreV1Api, models as k8s -try: - import airflow.utils.yaml as yaml -except ImportError: - import yaml - from airflow.exceptions import AirflowException from airflow.kubernetes import kube_client, pod_generator from airflow.kubernetes.pod_generator import PodGenerator @@ -43,15 +42,27 @@ convert_volume, convert_volume_mount, ) -from airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env import PodRuntimeInfoEnv -from airflow.providers.cncf.kubernetes.utils import pod_launcher, xcom_sidecar +from airflow.providers.cncf.kubernetes.utils import xcom_sidecar +from airflow.providers.cncf.kubernetes.utils.pod_manager import PodLaunchFailedException, PodManager, PodPhase +from airflow.settings import pod_mutation_hook +from airflow.utils import yaml from airflow.utils.helpers import validate_key -from airflow.utils.state import State from airflow.version import version as airflow_version +if sys.version_info >= (3, 8): + from functools import cached_property +else: + from cached_property import cached_property + if TYPE_CHECKING: import jinja2 + from airflow.utils.context import Context + + +class PodReattachFailure(AirflowException): + """When we expect to be able to find a pod but cannot.""" + class KubernetesPodOperator(BaseOperator): """ @@ -68,101 +79,66 @@ class KubernetesPodOperator(BaseOperator): simplifies the authorization process. :param namespace: the namespace to run within kubernetes. - :type namespace: str :param image: Docker image you wish to launch. Defaults to hub.docker.com, but fully qualified URLS will point to custom repositories. (templated) - :type image: str :param name: name of the pod in which the task will run, will be used (plus a random - suffix) to generate a pod id (DNS-1123 subdomain, containing only [a-z0-9.-]). - :type name: str + suffix if random_name_suffix is True) to generate a pod id (DNS-1123 subdomain, + containing only [a-z0-9.-]). + :param random_name_suffix: if True, will generate a random suffix. :param cmds: entrypoint of the container. (templated) The docker images's entrypoint is used if this is not provided. - :type cmds: list[str] :param arguments: arguments of the entrypoint. (templated) The docker image's CMD is used if this is not provided. - :type arguments: list[str] - :param ports: ports for launched pod. - :type ports: list[k8s.V1ContainerPort] - :param volume_mounts: volumeMounts for launched pod. - :type volume_mounts: list[k8s.V1VolumeMount] - :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes. - :type volumes: list[k8s.V1Volume] + :param ports: ports for the launched pod. + :param volume_mounts: volumeMounts for the launched pod. + :param volumes: volumes for the launched pod. Includes ConfigMaps and PersistentVolumes. :param env_vars: Environment variables initialized in the container. (templated) - :type env_vars: list[k8s.V1EnvVar] :param secrets: Kubernetes secrets to inject in the container. They can be exposed as environment vars or files in a volume. - :type secrets: list[airflow.kubernetes.secret.Secret] :param in_cluster: run kubernetes client with in_cluster configuration. - :type in_cluster: bool :param cluster_context: context that points to kubernetes cluster. Ignored when in_cluster is True. If None, current-context is used. - :type cluster_context: str :param reattach_on_restart: if the scheduler dies while the pod is running, reattach and monitor - :type reattach_on_restart: bool :param labels: labels to apply to the Pod. (templated) - :type labels: dict :param startup_timeout_seconds: timeout in seconds to startup the pod. - :type startup_timeout_seconds: int :param get_logs: get the stdout of the container as logs of the tasks. - :type get_logs: bool :param image_pull_policy: Specify a policy to cache or always pull an image. - :type image_pull_policy: str :param annotations: non-identifying metadata you can attach to the Pod. Can be a large range of data, and can include characters that are not permitted by labels. - :type annotations: dict - :param resources: A dict containing resources requests and limits. - Possible keys are request_memory, request_cpu, limit_memory, limit_cpu, - and limit_gpu, which will be used to generate airflow.kubernetes.pod.Resources. - See also kubernetes.io/docs/concepts/configuration/manage-compute-resources-container - :type resources: k8s.V1ResourceRequirements - :param affinity: A dict containing a group of affinity scheduling rules. - :type affinity: k8s.V1Affinity + :param resources: resources for the launched pod. + :param affinity: affinity scheduling rules for the launched pod. :param config_file: The path to the Kubernetes config file. (templated) If not specified, default value is ``~/.kube/config`` - :type config_file: str - :param node_selectors: A dict containing a group of scheduling rules. - :type node_selectors: dict + :param node_selector: A dict containing a group of scheduling rules. :param image_pull_secrets: Any image pull secrets to be given to the pod. If more than one secret is required, provide a comma separated list: secret_a,secret_b - :type image_pull_secrets: List[k8s.V1LocalObjectReference] :param service_account_name: Name of the service account - :type service_account_name: str :param is_delete_operator_pod: What to do when the pod reaches its final - state, or the execution is interrupted. - If False (default): do nothing, If True: delete the pod - :type is_delete_operator_pod: bool + state, or the execution is interrupted. If True (default), delete the + pod; if False, leave the pod. :param hostnetwork: If True enable host networking on the pod. - :type hostnetwork: bool :param tolerations: A list of kubernetes tolerations. - :type tolerations: List[k8s.V1Toleration] :param security_context: security options the pod should run with (PodSecurityContext). - :type security_context: dict :param dnspolicy: dnspolicy for the pod. - :type dnspolicy: str :param schedulername: Specify a schedulername for the pod - :type schedulername: str :param full_pod_spec: The complete podSpec - :type full_pod_spec: kubernetes.client.models.V1Pod :param init_containers: init container for the launched Pod - :type init_containers: list[kubernetes.client.models.V1Container] :param log_events_on_failure: Log the pod's events if a failure occurs - :type log_events_on_failure: bool :param do_xcom_push: If True, the content of the file /airflow/xcom/return.json in the container will also be pushed to an XCom when the container completes. - :type do_xcom_push: bool :param pod_template_file: path to pod template file (templated) - :type pod_template_file: str :param priority_class_name: priority class name for the launched Pod - :type priority_class_name: str :param termination_grace_period: Termination grace period if task killed in UI, defaults to kubernetes default - :type termination_grace_period: int """ - template_fields: Iterable[str] = ( + BASE_CONTAINER_NAME = 'base' + POD_CHECKED_KEY = 'already_checked' + + template_fields: Sequence[str] = ( 'image', 'cmds', 'arguments', @@ -170,16 +146,16 @@ class KubernetesPodOperator(BaseOperator): 'labels', 'config_file', 'pod_template_file', + 'namespace', ) - # fmt: off def __init__( - # fmt: on self, *, namespace: Optional[str] = None, image: Optional[str] = None, name: Optional[str] = None, + random_name_suffix: Optional[bool] = True, cmds: Optional[List[str]] = None, arguments: Optional[List[str]] = None, ports: Optional[List[k8s.V1ContainerPort]] = None, @@ -203,7 +179,7 @@ def __init__( node_selector: Optional[dict] = None, image_pull_secrets: Optional[List[k8s.V1LocalObjectReference]] = None, service_account_name: Optional[str] = None, - is_delete_operator_pod: bool = False, + is_delete_operator_pod: bool = True, hostnetwork: bool = False, tolerations: Optional[List[k8s.V1Toleration]] = None, security_context: Optional[Dict] = None, @@ -215,9 +191,9 @@ def __init__( do_xcom_push: bool = False, pod_template_file: Optional[str] = None, priority_class_name: Optional[str] = None, - pod_runtime_info_envs: List[PodRuntimeInfoEnv] = None, + pod_runtime_info_envs: Optional[List[k8s.V1EnvVar]] = None, termination_grace_period: Optional[int] = None, - configmaps: Optional[str] = None, + configmaps: Optional[List[str]] = None, **kwargs, ) -> None: if kwargs.get('xcom_push') is not None: @@ -264,8 +240,9 @@ def __init__( self.service_account_name = service_account_name self.is_delete_operator_pod = is_delete_operator_pod self.hostnetwork = hostnetwork - self.tolerations = [convert_toleration(toleration) for toleration in tolerations] \ - if tolerations else [] + self.tolerations = ( + [convert_toleration(toleration) for toleration in tolerations] if tolerations else [] + ) self.security_context = security_context or {} self.dnspolicy = dnspolicy self.schedulername = schedulername @@ -275,14 +252,15 @@ def __init__( self.priority_class_name = priority_class_name self.pod_template_file = pod_template_file self.name = self._set_name(name) + self.random_name_suffix = random_name_suffix self.termination_grace_period = termination_grace_period - self.client: CoreV1Api = None - self.pod: k8s.V1Pod = None + self.pod_request_obj: Optional[k8s.V1Pod] = None + self.pod: Optional[k8s.V1Pod] = None def _render_nested_template_fields( self, content: Any, - context: Dict, + context: 'Context', jinja_env: "jinja2.Environment", seen_oids: set, ) -> None: @@ -291,27 +269,31 @@ def _render_nested_template_fields( self._do_render_template_fields(content, ('value', 'name'), context, jinja_env, seen_oids) return - super()._render_nested_template_fields( - content, - context, - jinja_env, - seen_oids - ) + super()._render_nested_template_fields(content, context, jinja_env, seen_oids) @staticmethod - def create_labels_for_pod(context) -> dict: + def _get_ti_pod_labels(context: Optional[dict] = None, include_try_number: bool = True) -> dict: """ Generate labels for the pod to track the pod in case of Operator crash :param context: task context provided by airflow DAG :return: dict """ - labels = { - 'dag_id': context['dag'].dag_id, - 'task_id': context['task'].task_id, - 'execution_date': context['ts'], - 'try_number': context['ti'].try_number, - } + if not context: + return {} + + ti = context['ti'] + run_id = context['run_id'] + + labels = {'dag_id': ti.dag_id, 'task_id': ti.task_id, 'run_id': run_id} + + # If running on Airflow 2.3+: + map_index = getattr(ti, 'map_index', -1) + if map_index >= 0: + labels['map_index'] = map_index + + if include_try_number: + labels.update(try_number=ti.try_number) # In the case of sub dags this is just useful if context['dag'].is_subdag: labels['parent_dag_id'] = context['dag'].parent_dag.dag_id @@ -322,101 +304,127 @@ def create_labels_for_pod(context) -> dict: labels[label_id] = safe_label return labels - def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]: - return pod_launcher.PodLauncher(kube_client=self.client, extract_xcom=self.do_xcom_push) + @cached_property + def pod_manager(self) -> PodManager: + return PodManager(kube_client=self.client) - def execute(self, context) -> Optional[str]: - try: - if self.in_cluster is not None: - client = kube_client.get_kube_client( - in_cluster=self.in_cluster, - cluster_context=self.cluster_context, - config_file=self.config_file, - ) - else: - client = kube_client.get_kube_client( - cluster_context=self.cluster_context, config_file=self.config_file - ) - - self.client = client - - self.pod = self.create_pod_request_obj() - self.namespace = self.pod.metadata.namespace - - # Add combination of labels to uniquely identify a running pod - labels = self.create_labels_for_pod(context) + @cached_property + def client(self) -> CoreV1Api: + # todo: use airflow Connection / hook to authenticate to the cluster + kwargs: Dict[str, Any] = dict( + cluster_context=self.cluster_context, + config_file=self.config_file, + ) + if self.in_cluster is not None: + kwargs.update(in_cluster=self.in_cluster) + return kube_client.get_kube_client(**kwargs) + + def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]: + """Returns an already-running pod for this task instance if one exists.""" + label_selector = self._build_find_pod_label_selector(context) + pod_list = self.client.list_namespaced_pod( + namespace=namespace, + label_selector=label_selector, + ).items + + pod = None + num_pods = len(pod_list) + if num_pods > 1: + raise AirflowException(f'More than one pod running with labels {label_selector}') + elif num_pods == 1: + pod = pod_list[0] + self.log.info("Found matching pod %s with labels %s", pod.metadata.name, pod.metadata.labels) + self.log.info("`try_number` of task_instance: %s", context['ti'].try_number) + self.log.info("`try_number` of pod: %s", pod.metadata.labels['try_number']) + return pod - label_selector = self._get_pod_identifying_label_string(labels) + def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context): + if self.reattach_on_restart: + pod = self.find_pod(self.namespace or pod_request_obj.metadata.namespace, context=context) + if pod: + return pod + self.log.debug("Starting pod:\n%s", yaml.safe_dump(pod_request_obj.to_dict())) + self.pod_manager.create_pod(pod=pod_request_obj) + return pod_request_obj - pod_list = self.client.list_namespaced_pod(self.namespace, label_selector=label_selector) + def await_pod_start(self, pod): + try: + self.pod_manager.await_pod_start(pod=pod, startup_timeout=self.startup_timeout_seconds) + except PodLaunchFailedException: + if self.log_events_on_failure: + for event in self.pod_manager.read_pod_events(pod).items: + self.log.error("Pod Event: %s - %s", event.reason, event.message) + raise - if len(pod_list.items) > 1 and self.reattach_on_restart: - raise AirflowException( - f'More than one pod running with labels: {label_selector}' - ) + def extract_xcom(self, pod): + """Retrieves xcom value and kills xcom sidecar container""" + result = self.pod_manager.extract_xcom(pod) + self.log.info("xcom result: \n%s", result) + return json.loads(result) - launcher = self.create_pod_launcher() + def execute(self, context: 'Context'): + remote_pod = None + try: + self.pod_request_obj = self.build_pod_request_obj(context) + self.pod = self.get_or_create_pod( # must set `self.pod` for `on_kill` + pod_request_obj=self.pod_request_obj, + context=context, + ) + self.await_pod_start(pod=self.pod) - if len(pod_list.items) == 1: - try_numbers_match = self._try_numbers_match(context, pod_list.items[0]) - final_state, remote_pod, result = self.handle_pod_overlap( - labels, try_numbers_match, launcher, pod_list.items[0] + if self.get_logs: + self.pod_manager.fetch_container_logs( + pod=self.pod, + container_name=self.BASE_CONTAINER_NAME, + follow=True, ) else: - self.log.info("creating pod with labels %s and launcher %s", labels, launcher) - final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher) - if final_state != State.SUCCESS: - raise AirflowException(f'Pod {self.pod.metadata.name} returned a failure: {remote_pod}') - context['task_instance'].xcom_push(key='pod_name', value=self.pod.metadata.name) - context['task_instance'].xcom_push(key='pod_namespace', value=self.namespace) - return result - except AirflowException as ex: - raise AirflowException(f'Pod Launching failed: {ex}') - - def handle_pod_overlap( - self, labels: dict, try_numbers_match: bool, launcher: Any, pod: k8s.V1Pod - ) -> Tuple[State, k8s.V1Pod, Optional[str]]: - """ + self.pod_manager.await_container_completion( + pod=self.pod, container_name=self.BASE_CONTAINER_NAME + ) - In cases where the Scheduler restarts while a KubernetesPodOperator task is running, - this function will either continue to monitor the existing pod or launch a new pod - based on the `reattach_on_restart` parameter. + if self.do_xcom_push: + result = self.extract_xcom(pod=self.pod) + remote_pod = self.pod_manager.await_pod_completion(self.pod) + finally: + self.cleanup( + pod=self.pod or self.pod_request_obj, + remote_pod=remote_pod, + ) + ti = context['ti'] + ti.xcom_push(key='pod_name', value=self.pod.metadata.name) + ti.xcom_push(key='pod_namespace', value=self.pod.metadata.namespace) + if self.do_xcom_push: + return result - :param labels: labels used to determine if a pod is repeated - :type labels: dict - :param try_numbers_match: do the try numbers match? Only needed for logging purposes - :type try_numbers_match: bool - :param launcher: PodLauncher - :param pod: Pod found with matching labels - """ - if try_numbers_match: - log_line = f"found a running pod with labels {labels} and the same try_number." - else: - log_line = f"found a running pod with labels {labels} but a different try_number." - - # In case of failed pods, should reattach the first time, but only once - # as the task will have already failed. - if self.reattach_on_restart and not pod.metadata.labels.get("already_checked"): - log_line += " Will attach to this pod and monitor instead of starting new one" - self.log.info(log_line) - self.pod = pod - final_state, remote_pod, result = self.monitor_launched_pod(launcher, pod) + def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): + pod_phase = remote_pod.status.phase if hasattr(remote_pod, 'status') else None + if pod_phase != PodPhase.SUCCEEDED: + if self.log_events_on_failure: + with _suppress(Exception): + for event in self.pod_manager.read_pod_events(pod).items: + self.log.error("Pod Event: %s - %s", event.reason, event.message) + if not self.is_delete_operator_pod: + with _suppress(Exception): + self.patch_already_checked(pod) + with _suppress(Exception): + self.process_pod_deletion(pod) + raise AirflowException(f'Pod {pod and pod.metadata.name} returned a failure: {remote_pod}') else: - log_line += f"creating pod with labels {labels} and launcher {launcher}" - self.log.info(log_line) - final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher) - return final_state, remote_pod, result + with _suppress(Exception): + self.process_pod_deletion(pod) - @staticmethod - def _get_pod_identifying_label_string(labels) -> str: - label_strings = [ - f'{label_id}={label}' for label_id, label in sorted(labels.items()) if label_id != 'try_number' - ] - return ','.join(label_strings) + ',already_checked!=True' + def process_pod_deletion(self, pod): + if self.is_delete_operator_pod: + self.log.info("Deleting pod: %s", pod.metadata.name) + self.pod_manager.delete_pod(pod) + else: + self.log.info("skipping deleting pod: %s", pod.metadata.name) - @staticmethod - def _try_numbers_match(context, pod) -> bool: - return pod.metadata.labels['try_number'] == context['ti'].try_number + def _build_find_pod_label_selector(self, context: Optional[dict] = None) -> str: + labels = self._get_ti_pod_labels(context, include_try_number=False) + label_strings = [f'{label_id}={label}' for label_id, label in sorted(labels.items())] + return ','.join(label_strings) + f',{self.POD_CHECKED_KEY}!=True' def _set_name(self, name): if name is None: @@ -427,11 +435,29 @@ def _set_name(self, name): validate_key(name, max_length=220) return re.sub(r'[^a-z0-9.-]+', '-', name.lower()) - def create_pod_request_obj(self) -> k8s.V1Pod: + def patch_already_checked(self, pod: k8s.V1Pod): + """Add an "already checked" annotation to ensure we don't reattach on retries""" + pod.metadata.labels[self.POD_CHECKED_KEY] = "True" + body = PodGenerator.serialize_pod(pod) + self.client.patch_namespaced_pod(pod.metadata.name, pod.metadata.namespace, body) + + def on_kill(self) -> None: + if self.pod: + pod = self.pod + kwargs = dict( + name=pod.metadata.name, + namespace=pod.metadata.namespace, + ) + if self.termination_grace_period is not None: + kwargs.update(grace_period_seconds=self.termination_grace_period) + self.client.delete_namespaced_pod(**kwargs) + + def build_pod_request_obj(self, context=None): """ - Creates a V1Pod based on user parameters. Note that a `pod` or `pod_template_file` - will supersede all other values. + Returns V1Pod object based on pod template file, full pod spec, and other operator parameters. + The V1Pod attributes are derived (in order of precedence) from operator params, full pod spec, pod + template file. """ self.log.debug("Creating pod for KubernetesPodOperator task %s", self.task_id) if self.pod_template_file: @@ -450,7 +476,7 @@ def create_pod_request_obj(self) -> k8s.V1Pod: metadata=k8s.V1ObjectMeta( namespace=self.namespace, labels=self.labels, - name=PodGenerator.make_unique_pod_id(self.name), + name=self.name, annotations=self.annotations, ), spec=k8s.V1PodSpec( @@ -461,7 +487,7 @@ def create_pod_request_obj(self) -> k8s.V1Pod: containers=[ k8s.V1Container( image=self.image, - name="base", + name=self.BASE_CONTAINER_NAME, command=self.cmds, ports=self.ports, image_pull_policy=self.image_pull_policy, @@ -486,89 +512,112 @@ def create_pod_request_obj(self) -> k8s.V1Pod: pod = PodGenerator.reconcile_pods(pod_template, pod) + if self.random_name_suffix: + pod.metadata.name = PodGenerator.make_unique_pod_id(pod.metadata.name) + for secret in self.secrets: self.log.debug("Adding secret to task %s", self.task_id) pod = secret.attach_to_pod(pod) if self.do_xcom_push: self.log.debug("Adding xcom sidecar to task %s", self.task_id) pod = xcom_sidecar.add_xcom_sidecar(pod) - return pod - - def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, k8s.V1Pod, Optional[str]]: - """ - Creates a new pod and monitors for duration of task - :param labels: labels used to track pod - :param launcher: pod launcher that will manage launching and monitoring pods - :return: - """ - self.log.debug( - "Adding KubernetesPodOperator labels to pod before launch for task %s", self.task_id - ) + labels = self._get_ti_pod_labels(context) + self.log.info("Creating pod %s with labels: %s", pod.metadata.name, labels) # Merge Pod Identifying labels with labels passed to operator - self.pod.metadata.labels.update(labels) + pod.metadata.labels.update(labels) # Add Airflow Version to the label # And a label to identify that pod is launched by KubernetesPodOperator - self.pod.metadata.labels.update( + pod.metadata.labels.update( { 'airflow_version': airflow_version.replace('+', '-'), 'kubernetes_pod_operator': 'True', } ) + pod_mutation_hook(pod) + return pod - self.log.debug("Starting pod:\n%s", yaml.safe_dump(self.pod.to_dict())) - final_state = None - try: - launcher.start_pod(self.pod, startup_timeout=self.startup_timeout_seconds) - final_state, remote_pod, result = launcher.monitor_pod(pod=self.pod, get_logs=self.get_logs) - except AirflowException: - if self.log_events_on_failure: - for event in launcher.read_pod_events(self.pod).items: - self.log.error("Pod Event: %s - %s", event.reason, event.message) - raise - finally: - if self.is_delete_operator_pod: - self.log.debug("Deleting pod for task %s", self.task_id) - launcher.delete_pod(self.pod) - elif final_state != State.SUCCESS: - self.patch_already_checked(self.pod) - return final_state, remote_pod, result + def dry_run(self) -> None: + """ + Prints out the pod definition that would be created by this operator. + Does not include labels specific to the task instance (since there isn't + one in a dry_run) and excludes all empty elements. + """ + pod = self.build_pod_request_obj() + print(yaml.dump(_prune_dict(pod.to_dict(), mode='strict'))) - def patch_already_checked(self, pod: k8s.V1Pod): - """Add an "already tried annotation to ensure we only retry once""" - pod.metadata.labels["already_checked"] = "True" - body = PodGenerator.serialize_pod(pod) - self.client.patch_namespaced_pod(pod.metadata.name, pod.metadata.namespace, body) - def monitor_launched_pod(self, launcher, pod) -> Tuple[State, Optional[str]]: - """ - Monitors a pod to completion that was created by a previous KubernetesPodOperator +class _suppress(AbstractContextManager): + """ + This behaves the same as ``contextlib.suppress`` but logs the suppressed + exceptions as errors with traceback. - :param launcher: pod launcher that will manage launching and monitoring pods - :param pod: podspec used to find pod using k8s API - :return: - """ - try: - (final_state, remote_pod, result) = launcher.monitor_pod(pod, get_logs=self.get_logs) - finally: - if self.is_delete_operator_pod: - launcher.delete_pod(pod) - if final_state != State.SUCCESS: - if self.log_events_on_failure: - for event in launcher.read_pod_events(pod).items: - self.log.error("Pod Event: %s - %s", event.reason, event.message) - if not self.is_delete_operator_pod: - self.patch_already_checked(pod) - raise AirflowException(f'Pod returned a failure: {final_state}') - return final_state, remote_pod, result + The caught exception is also stored on the context manager instance under + attribute ``exception``. + """ - def on_kill(self) -> None: - if self.pod: - pod: k8s.V1Pod = self.pod - namespace = pod.metadata.namespace - name = pod.metadata.name - kwargs = {} - if self.termination_grace_period is not None: - kwargs = {"grace_period_seconds": self.termination_grace_period} - self.client.delete_namespaced_pod(name=name, namespace=namespace, **kwargs) + def __init__(self, *exceptions): + self._exceptions = exceptions + self.exception = None + + def __enter__(self): + return self + + def __exit__(self, exctype, excinst, exctb): + caught_error = exctype is not None and issubclass(exctype, self._exceptions) + if caught_error: + self.exception = excinst + logger = logging.getLogger() + logger.error(str(excinst), exc_info=True) + return caught_error + + +def _prune_dict(val: Any, mode='strict'): + """ + Note: this is duplicated from ``airflow.utils.helpers.prune_dict``. That one should + be the one used if possible, but this one is included to avoid having to + bump min airflow version. This function will be removed once the min airflow version + is bumped to 2.3. + + Given dict ``val``, returns new dict based on ``val`` with all + empty elements removed. + + What constitutes "empty" is controlled by the ``mode`` parameter. If mode is 'strict' + then only ``None`` elements will be removed. If mode is ``truthy``, then element ``x`` + will be removed if ``bool(x) is False``. + """ + + def is_empty(x): + if mode == 'strict': + return x is None + elif mode == 'truthy': + return bool(x) is False + raise ValueError("allowable values for `mode` include 'truthy' and 'strict'") + + if isinstance(val, dict): + new_dict = {} + for k, v in val.items(): + if is_empty(v): + continue + elif isinstance(v, (list, dict)): + new_val = _prune_dict(v, mode=mode) + if new_val: + new_dict[k] = new_val + else: + new_dict[k] = v + return new_dict + elif isinstance(val, list): + new_list = [] + for v in val: + if is_empty(v): + continue + elif isinstance(v, (list, dict)): + new_val = _prune_dict(v, mode=mode) + if new_val: + new_list.append(new_val) + else: + new_list.append(v) + return new_list + else: + return val diff --git a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py index 9779292dffb679..10296871efc606 100644 --- a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py +++ b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -15,11 +15,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from typing import Optional +from typing import TYPE_CHECKING, Optional, Sequence from airflow.models import BaseOperator from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook +if TYPE_CHECKING: + from airflow.utils.context import Context + class SparkKubernetesOperator(BaseOperator): """ @@ -31,20 +34,15 @@ class SparkKubernetesOperator(BaseOperator): :param application_file: Defines Kubernetes 'custom_resource_definition' of 'sparkApplication' as either a path to a '.json' file or a JSON string. - :type application_file: str :param namespace: kubernetes namespace to put sparkApplication - :type namespace: str :param kubernetes_conn_id: The :ref:`kubernetes connection id ` for the to Kubernetes cluster. - :type kubernetes_conn_id: str :param api_group: kubernetes api group of sparkApplication - :type api_group: str :param api_version: kubernetes api version of sparkApplication - :type api_version: str """ - template_fields = ['application_file', 'namespace'] - template_ext = ('.yaml', '.yml', '.json') + template_fields: Sequence[str] = ('application_file', 'namespace') + template_ext: Sequence[str] = ('.yaml', '.yml', '.json') ui_color = '#f4a460' def __init__( @@ -64,7 +62,7 @@ def __init__( self.api_group = api_group self.api_version = api_version - def execute(self, context): + def execute(self, context: 'Context'): self.log.info("Creating sparkApplication") hook = KubernetesHook(conn_id=self.kubernetes_conn_id) response = hook.create_custom_object( diff --git a/airflow/providers/cncf/kubernetes/provider.yaml b/airflow/providers/cncf/kubernetes/provider.yaml index c7878ba9c416fd..b5b50542573ab4 100644 --- a/airflow/providers/cncf/kubernetes/provider.yaml +++ b/airflow/providers/cncf/kubernetes/provider.yaml @@ -22,6 +22,14 @@ description: | `Kubernetes `__ versions: + - 3.1.2 + - 3.1.1 + - 3.1.0 + - 3.0.2 + - 3.0.1 + - 3.0.0 + - 2.2.0 + - 2.1.0 - 2.0.3 - 2.0.2 - 2.0.1 diff --git a/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py b/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py index da29e7974f298b..15ac40bcdb90ac 100644 --- a/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py +++ b/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from typing import Dict, Optional +from typing import TYPE_CHECKING, Optional, Sequence from kubernetes import client @@ -23,6 +23,9 @@ from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook from airflow.sensors.base import BaseSensorOperator +if TYPE_CHECKING: + from airflow.utils.context import Context + class SparkKubernetesSensor(BaseSensorOperator): """ @@ -33,21 +36,15 @@ class SparkKubernetesSensor(BaseSensorOperator): https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta2-1.1.0-2.4.5/docs/api-docs.md#sparkapplication :param application_name: spark Application resource name - :type application_name: str :param namespace: the kubernetes namespace where the sparkApplication reside in - :type namespace: str :param kubernetes_conn_id: The :ref:`kubernetes connection` to Kubernetes cluster. - :type kubernetes_conn_id: str :param attach_log: determines whether logs for driver pod should be appended to the sensor log - :type attach_log: bool :param api_group: kubernetes api group of sparkApplication - :type api_group: str :param api_version: kubernetes api version of sparkApplication - :type api_version: str """ - template_fields = ("application_name", "namespace") + template_fields: Sequence[str] = ("application_name", "namespace") FAILURE_STATES = ("FAILED", "UNKNOWN") SUCCESS_STATES = ("COMPLETED",) @@ -97,7 +94,7 @@ def _log_driver(self, application_state: str, response: dict) -> None: e, ) - def poke(self, context: Dict) -> bool: + def poke(self, context: 'Context') -> bool: self.log.info("Poking: %s", self.application_name) response = self.hook.get_custom_object( group=self.api_group, diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py new file mode 100644 index 00000000000000..4221ac2afd95e9 --- /dev/null +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -0,0 +1,370 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Launches PODs""" +import json +import math +import time +import warnings +from contextlib import closing +from dataclasses import dataclass +from datetime import datetime +from typing import TYPE_CHECKING, Iterable, Optional, Tuple, cast + +import pendulum +import tenacity +from kubernetes import client, watch +from kubernetes.client.models.v1_pod import V1Pod +from kubernetes.client.rest import ApiException +from kubernetes.stream import stream as kubernetes_stream +from pendulum import DateTime +from pendulum.parsing.exceptions import ParserError +from urllib3.exceptions import HTTPError as BaseHTTPError + +from airflow.exceptions import AirflowException +from airflow.kubernetes.kube_client import get_kube_client +from airflow.kubernetes.pod_generator import PodDefaults +from airflow.utils.log.logging_mixin import LoggingMixin + +if TYPE_CHECKING: + from kubernetes.client.models.core_v1_event_list import CoreV1EventList + + +class PodLaunchFailedException(AirflowException): + """When pod launching fails in KubernetesPodOperator.""" + + +def should_retry_start_pod(exception: BaseException) -> bool: + """Check if an Exception indicates a transient error and warrants retrying""" + if isinstance(exception, ApiException): + return exception.status == 409 + return False + + +class PodPhase: + """ + Possible pod phases + See https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase. + """ + + PENDING = 'Pending' + RUNNING = 'Running' + FAILED = 'Failed' + SUCCEEDED = 'Succeeded' + + terminal_states = {FAILED, SUCCEEDED} + + +def container_is_running(pod: V1Pod, container_name: str) -> bool: + """ + Examines V1Pod ``pod`` to determine whether ``container_name`` is running. + If that container is present and running, returns True. Returns False otherwise. + """ + container_statuses = pod.status.container_statuses if pod and pod.status else None + if not container_statuses: + return False + container_status = next(iter([x for x in container_statuses if x.name == container_name]), None) + if not container_status: + return False + return container_status.state.running is not None + + +@dataclass +class PodLoggingStatus: + """Used for returning the status of the pod and last log time when exiting from `fetch_container_logs`""" + + running: bool + last_log_time: Optional[DateTime] + + +class PodManager(LoggingMixin): + """ + Helper class for creating, monitoring, and otherwise interacting with Kubernetes pods + for use with the KubernetesPodOperator + """ + + def __init__( + self, + kube_client: client.CoreV1Api = None, + in_cluster: bool = True, + cluster_context: Optional[str] = None, + ): + """ + Creates the launcher. + + :param kube_client: kubernetes client + :param in_cluster: whether we are in cluster + :param cluster_context: context of the cluster + """ + super().__init__() + self._client = kube_client or get_kube_client(in_cluster=in_cluster, cluster_context=cluster_context) + self._watch = watch.Watch() + + def run_pod_async(self, pod: V1Pod, **kwargs) -> V1Pod: + """Runs POD asynchronously""" + sanitized_pod = self._client.api_client.sanitize_for_serialization(pod) + json_pod = json.dumps(sanitized_pod, indent=2) + + self.log.debug('Pod Creation Request: \n%s', json_pod) + try: + resp = self._client.create_namespaced_pod( + body=sanitized_pod, namespace=pod.metadata.namespace, **kwargs + ) + self.log.debug('Pod Creation Response: %s', resp) + except Exception as e: + self.log.exception( + 'Exception when attempting to create Namespaced Pod: %s', str(json_pod).replace("\n", " ") + ) + raise e + return resp + + def delete_pod(self, pod: V1Pod) -> None: + """Deletes POD""" + try: + self._client.delete_namespaced_pod( + pod.metadata.name, pod.metadata.namespace, body=client.V1DeleteOptions() + ) + except ApiException as e: + # If the pod is already deleted + if e.status != 404: + raise + + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_random_exponential(), + reraise=True, + retry=tenacity.retry_if_exception(should_retry_start_pod), + ) + def create_pod(self, pod: V1Pod) -> V1Pod: + """Launches the pod asynchronously.""" + return self.run_pod_async(pod) + + def await_pod_start(self, pod: V1Pod, startup_timeout: int = 120) -> None: + """ + Waits for the pod to reach phase other than ``Pending`` + + :param pod: + :param startup_timeout: Timeout (in seconds) for startup of the pod + (if pod is pending for too long, fails task) + :return: + """ + curr_time = datetime.now() + while True: + remote_pod = self.read_pod(pod) + if remote_pod.status.phase != PodPhase.PENDING: + break + self.log.warning("Pod not yet started: %s", pod.metadata.name) + delta = datetime.now() - curr_time + if delta.total_seconds() >= startup_timeout: + msg = ( + f"Pod took longer than {startup_timeout} seconds to start. " + "Check the pod events in kubernetes to determine why." + ) + raise PodLaunchFailedException(msg) + time.sleep(1) + + def follow_container_logs(self, pod: V1Pod, container_name: str) -> PodLoggingStatus: + warnings.warn( + "Method `follow_container_logs` is deprecated. Use `fetch_container_logs` instead" + "with option `follow=True`.", + DeprecationWarning, + ) + return self.fetch_container_logs(pod=pod, container_name=container_name, follow=True) + + def fetch_container_logs( + self, pod: V1Pod, container_name: str, *, follow=False, since_time: Optional[DateTime] = None + ) -> PodLoggingStatus: + """ + Follows the logs of container and streams to airflow logging. + Returns when container exits. + """ + + def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True) -> Optional[DateTime]: + """ + Tries to follow container logs until container completes. + For a long-running container, sometimes the log read may be interrupted + Such errors of this kind are suppressed. + + Returns the last timestamp observed in logs. + """ + timestamp = None + try: + logs = self.read_pod_logs( + pod=pod, + container_name=container_name, + timestamps=True, + since_seconds=( + math.ceil((pendulum.now() - since_time).total_seconds()) if since_time else None + ), + follow=follow, + ) + for line in logs: + timestamp, message = self.parse_log_line(line.decode('utf-8')) + self.log.info(message) + except BaseHTTPError: # Catches errors like ProtocolError(TimeoutError). + self.log.warning( + 'Failed to read logs for pod %s', + pod.metadata.name, + exc_info=True, + ) + return timestamp or since_time + + # note: `read_pod_logs` follows the logs, so we shouldn't necessarily *need* to + # loop as we do here. But in a long-running process we might temporarily lose connectivity. + # So the looping logic is there to let us resume following the logs. + last_log_time = since_time + while True: + last_log_time = consume_logs(since_time=last_log_time, follow=follow) + if not self.container_is_running(pod, container_name=container_name): + return PodLoggingStatus(running=False, last_log_time=last_log_time) + if not follow: + return PodLoggingStatus(running=True, last_log_time=last_log_time) + else: + self.log.warning( + 'Pod %s log read interrupted but container %s still running', + pod.metadata.name, + container_name, + ) + time.sleep(1) + + def await_container_completion(self, pod: V1Pod, container_name: str) -> None: + while not self.container_is_running(pod=pod, container_name=container_name): + time.sleep(1) + + def await_pod_completion(self, pod: V1Pod) -> V1Pod: + """ + Monitors a pod and returns the final state + + :param pod: pod spec that will be monitored + :return: Tuple[State, Optional[str]] + """ + while True: + remote_pod = self.read_pod(pod) + if remote_pod.status.phase in PodPhase.terminal_states: + break + self.log.info('Pod %s has phase %s', pod.metadata.name, remote_pod.status.phase) + time.sleep(2) + return remote_pod + + def parse_log_line(self, line: str) -> Tuple[Optional[DateTime], str]: + """ + Parse K8s log line and returns the final state + + :param line: k8s log line + :return: timestamp and log message + :rtype: Tuple[str, str] + """ + split_at = line.find(' ') + if split_at == -1: + raise Exception(f'Log not in "{{timestamp}} {{log}}" format. Got: {line}') + timestamp = line[:split_at] + message = line[split_at + 1 :].rstrip() + try: + last_log_time = cast(DateTime, pendulum.parse(timestamp)) + except ParserError: + self.log.error("Error parsing timestamp. Will continue execution but won't update timestamp") + return None, line + return last_log_time, message + + def container_is_running(self, pod: V1Pod, container_name: str) -> bool: + """Reads pod and checks if container is running""" + remote_pod = self.read_pod(pod) + return container_is_running(pod=remote_pod, container_name=container_name) + + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) + def read_pod_logs( + self, + pod: V1Pod, + container_name: str, + tail_lines: Optional[int] = None, + timestamps: bool = False, + since_seconds: Optional[int] = None, + follow=True, + ) -> Iterable[bytes]: + """Reads log from the POD""" + additional_kwargs = {} + if since_seconds: + additional_kwargs['since_seconds'] = since_seconds + + if tail_lines: + additional_kwargs['tail_lines'] = tail_lines + + try: + return self._client.read_namespaced_pod_log( + name=pod.metadata.name, + namespace=pod.metadata.namespace, + container=container_name, + follow=follow, + timestamps=timestamps, + _preload_content=False, + **additional_kwargs, + ) + except BaseHTTPError: + self.log.exception('There was an error reading the kubernetes API.') + raise + + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) + def read_pod_events(self, pod: V1Pod) -> "CoreV1EventList": + """Reads events from the POD""" + try: + return self._client.list_namespaced_event( + namespace=pod.metadata.namespace, field_selector=f"involvedObject.name={pod.metadata.name}" + ) + except BaseHTTPError as e: + raise AirflowException(f'There was an error reading the kubernetes API: {e}') + + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) + def read_pod(self, pod: V1Pod) -> V1Pod: + """Read POD information""" + try: + return self._client.read_namespaced_pod(pod.metadata.name, pod.metadata.namespace) + except BaseHTTPError as e: + raise AirflowException(f'There was an error reading the kubernetes API: {e}') + + def extract_xcom(self, pod: V1Pod) -> str: + """Retrieves XCom value and kills xcom sidecar container""" + with closing( + kubernetes_stream( + self._client.connect_get_namespaced_pod_exec, + pod.metadata.name, + pod.metadata.namespace, + container=PodDefaults.SIDECAR_CONTAINER_NAME, + command=['/bin/sh'], + stdin=True, + stdout=True, + stderr=True, + tty=False, + _preload_content=False, + ) + ) as resp: + result = self._exec_pod_command(resp, f'cat {PodDefaults.XCOM_MOUNT_PATH}/return.json') + self._exec_pod_command(resp, 'kill -s SIGINT 1') + if result is None: + raise AirflowException(f'Failed to extract xcom from pod: {pod.metadata.name}') + return result + + def _exec_pod_command(self, resp, command: str) -> Optional[str]: + if resp.is_open(): + self.log.info('Running command... %s\n', command) + resp.write_stdin(command + '\n') + while resp.is_open(): + resp.update(timeout=1) + if resp.peek_stdout(): + return resp.read_stdout() + if resp.peek_stderr(): + self.log.info("stderr from command: %s", resp.read_stderr()) + break + return None diff --git a/setup.py b/setup.py index 794c33bcd0e94c..0e6ae83a6557ac 100644 --- a/setup.py +++ b/setup.py @@ -406,7 +406,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version ] kubernetes = [ 'cryptography>=2.0.0', - 'kubernetes>=3.0.0, <12.0.0', + 'kubernetes>=21.7.0', ] kylin = ['kylinpy>=2.6'] ldap = [ diff --git a/tests/kubernetes/test_client.py b/tests/kubernetes/test_client.py index bf5dcfc3d0a2d6..b3f41fa9427bc0 100644 --- a/tests/kubernetes/test_client.py +++ b/tests/kubernetes/test_client.py @@ -22,25 +22,21 @@ from kubernetes.client import Configuration from urllib3.connection import HTTPConnection, HTTPSConnection -from airflow.kubernetes.kube_client import ( - RefreshConfiguration, - _disable_verify_ssl, - _enable_tcp_keepalive, - get_kube_client, -) +from airflow.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive, get_kube_client class TestClient(unittest.TestCase): @mock.patch('airflow.kubernetes.kube_client.config') - def test_load_cluster_config(self, _): - client = get_kube_client(in_cluster=True) - assert not isinstance(client.api_client.configuration, RefreshConfiguration) + def test_load_cluster_config(self, config): + get_kube_client(in_cluster=True) + assert config.load_incluster_config.called + assert config.load_kube_config.not_called @mock.patch('airflow.kubernetes.kube_client.config') - @mock.patch('airflow.kubernetes.refresh_config._get_kube_config_loader_for_yaml_file') - def test_load_file_config(self, _, _2): - client = get_kube_client(in_cluster=False) - assert isinstance(client.api_client.configuration, RefreshConfiguration) + def test_load_file_config(self, config): + get_kube_client(in_cluster=False) + assert config.load_incluster_config.not_called + assert config.load_kube_config.called def test_enable_tcp_keepalive(self): socket_options = [ diff --git a/tests/kubernetes/test_refresh_config.py b/tests/kubernetes/test_refresh_config.py deleted file mode 100644 index a0753e2e4b2091..00000000000000 --- a/tests/kubernetes/test_refresh_config.py +++ /dev/null @@ -1,37 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from unittest import TestCase - -import pytest -from pendulum.parsing import ParserError - -from airflow.kubernetes.refresh_config import _parse_timestamp - - -class TestRefreshKubeConfigLoader(TestCase): - def test_parse_timestamp_should_convert_z_timezone_to_unix_timestamp(self): - ts = _parse_timestamp("2020-01-13T13:42:20Z") - assert 1578922940 == ts - - def test_parse_timestamp_should_convert_regular_timezone_to_unix_timestamp(self): - ts = _parse_timestamp("2020-01-13T13:42:20+0600") - assert 1578922940 == ts - - def test_parse_timestamp_should_throw_exception(self): - with pytest.raises(ParserError): - _parse_timestamp("foobar")