diff --git a/UPDATING.md b/UPDATING.md index 6532160396163..a706258a63185 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -84,9 +84,7 @@ https://developers.google.com/style/inclusive-documentation ## Airflow 2.2.5 -### Minimum kubernetes version bumped from 3.0.0 to 21.7.0 - -No change in behavior is expected. This was necessary in order to take advantage of a [bugfix](https://github.com/kubernetes-client/python-base/commit/70b78cd8488068c014b6d762a0c8d358273865b4) concerning refreshing of Kubernetes API tokens with EKS, which enabled the removal of some [workaround code](https://github.com/apache/airflow/pull/20759). +No breaking changes. ## Airflow 2.2.4 @@ -1381,7 +1379,7 @@ delete this option. #### `airflow.models.dagbag.DagBag` -Passing `store_serialized_dags` argument to DagBag.__init__ and accessing `DagBag.store_serialized_dags` property +Passing `store_serialized_dags` argument to `DagBag.__init__` and accessing `DagBag.store_serialized_dags` property are deprecated and will be removed in future versions. diff --git a/airflow/kubernetes/kube_client.py b/airflow/kubernetes/kube_client.py index aa497158a04ab..1c20bd3b93a74 100644 --- a/airflow/kubernetes/kube_client.py +++ b/airflow/kubernetes/kube_client.py @@ -25,10 +25,39 @@ try: from kubernetes import client, config from kubernetes.client import Configuration + from kubernetes.client.api_client import ApiClient from kubernetes.client.rest import ApiException + from airflow.kubernetes.refresh_config import RefreshConfiguration, load_kube_config + has_kubernetes = True + def _get_kube_config( + in_cluster: bool, cluster_context: Optional[str], config_file: Optional[str] + ) -> Optional[Configuration]: + if in_cluster: + # load_incluster_config set default configuration with config populated by k8s + config.load_incluster_config() + return None + else: + # this block can be replaced with just config.load_kube_config once + # refresh_config module is replaced with upstream fix + cfg = RefreshConfiguration() + load_kube_config(client_configuration=cfg, config_file=config_file, context=cluster_context) + return cfg + + def _get_client_with_patched_configuration(cfg: Optional[Configuration]) -> client.CoreV1Api: + """ + This is a workaround for supporting api token refresh in k8s client. + + The function can be replace with `return client.CoreV1Api()` once the + upstream client supports token refresh. + """ + if cfg: + return client.CoreV1Api(api_client=ApiClient(configuration=cfg)) + else: + return client.CoreV1Api() + def _disable_verify_ssl() -> None: configuration = Configuration() configuration.verify_ssl = False @@ -101,19 +130,17 @@ def get_kube_client( if not has_kubernetes: raise _import_err + if not in_cluster: + if cluster_context is None: + cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None) + if config_file is None: + config_file = conf.get('kubernetes', 'config_file', fallback=None) + if conf.getboolean('kubernetes', 'enable_tcp_keepalive'): _enable_tcp_keepalive() if not conf.getboolean('kubernetes', 'verify_ssl'): _disable_verify_ssl() - if in_cluster: - config.load_incluster_config() - else: - if cluster_context is None: - cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None) - if config_file is None: - config_file = conf.get('kubernetes', 'config_file', fallback=None) - config.load_kube_config(config_file=config_file, context=cluster_context) - - return client.CoreV1Api() + client_conf = _get_kube_config(in_cluster, cluster_context, config_file) + return _get_client_with_patched_configuration(client_conf) diff --git a/airflow/kubernetes/refresh_config.py b/airflow/kubernetes/refresh_config.py new file mode 100644 index 0000000000000..25649510ce0d2 --- /dev/null +++ b/airflow/kubernetes/refresh_config.py @@ -0,0 +1,124 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +NOTE: this module can be removed once upstream client supports token refresh +see: https://github.com/kubernetes-client/python/issues/741 +""" + +import calendar +import logging +import os +import time +from typing import Optional, cast + +import pendulum +from kubernetes.client import Configuration +from kubernetes.config.exec_provider import ExecProvider +from kubernetes.config.kube_config import KUBE_CONFIG_DEFAULT_LOCATION, KubeConfigLoader + +from airflow.utils import yaml + + +def _parse_timestamp(ts_str: str) -> int: + parsed_dt = cast(pendulum.DateTime, pendulum.parse(ts_str)) + return calendar.timegm(parsed_dt.timetuple()) + + +class RefreshKubeConfigLoader(KubeConfigLoader): + """ + Patched KubeConfigLoader, this subclass takes expirationTimestamp into + account and sets api key refresh callback hook in Configuration object + """ + + def __init__(self, *args, **kwargs): + KubeConfigLoader.__init__(self, *args, **kwargs) + self.api_key_expire_ts = None + + def _load_from_exec_plugin(self): + """ + We override _load_from_exec_plugin method to also read and store + expiration timestamp for aws-iam-authenticator. It will be later + used for api token refresh. + """ + if 'exec' not in self._user: + return None + try: + status = ExecProvider(self._user['exec']).run() + if 'token' not in status: + logging.error('exec: missing token field in plugin output') + return None + self.token = f"Bearer {status['token']}" + ts_str = status.get('expirationTimestamp') + if ts_str: + self.api_key_expire_ts = _parse_timestamp(ts_str) + return True + except Exception as e: + logging.error(str(e)) + return None + + def refresh_api_key(self, client_configuration): + """Refresh API key if expired""" + if self.api_key_expire_ts and time.time() >= self.api_key_expire_ts: + self.load_and_set(client_configuration) + + def load_and_set(self, client_configuration): + KubeConfigLoader.load_and_set(self, client_configuration) + client_configuration.refresh_api_key = self.refresh_api_key + + +class RefreshConfiguration(Configuration): + """ + Patched Configuration, this subclass takes api key refresh callback hook + into account + """ + + def __init__(self, *args, **kwargs): + Configuration.__init__(self, *args, **kwargs) + self.refresh_api_key = None + + def get_api_key_with_prefix(self, identifier): + if self.refresh_api_key: + self.refresh_api_key(self) + return Configuration.get_api_key_with_prefix(self, identifier) + + +def _get_kube_config_loader_for_yaml_file(filename, **kwargs) -> Optional[RefreshKubeConfigLoader]: + """ + Adapted from the upstream _get_kube_config_loader_for_yaml_file function, changed + KubeConfigLoader to RefreshKubeConfigLoader + """ + with open(filename) as f: + return RefreshKubeConfigLoader( + config_dict=yaml.safe_load(f), + config_base_path=os.path.abspath(os.path.dirname(filename)), + **kwargs, + ) + + +def load_kube_config(client_configuration, config_file=None, context=None): + """ + Adapted from the upstream load_kube_config function, changes: + - removed persist_config argument since it's not being used + - remove `client_configuration is None` branch since we always pass + in client configuration + """ + if config_file is None: + config_file = os.path.expanduser(KUBE_CONFIG_DEFAULT_LOCATION) + + loader = _get_kube_config_loader_for_yaml_file(config_file, active_context=context, config_persister=None) + loader.load_and_set(client_configuration) diff --git a/airflow/providers/cncf/kubernetes/CHANGELOG.rst b/airflow/providers/cncf/kubernetes/CHANGELOG.rst index 5bcf56941f55e..7f686b22f68a4 100644 --- a/airflow/providers/cncf/kubernetes/CHANGELOG.rst +++ b/airflow/providers/cncf/kubernetes/CHANGELOG.rst @@ -19,231 +19,6 @@ Changelog --------- -3.1.2 -..... - -Bug Fixes -~~~~~~~~~ - -* ``Fix mistakenly added install_requires for all providers (#22382)`` -* ``Fix "run_id" k8s and elasticsearch compatibility with Airflow 2.1 (#22385)`` - -Misc -~~~~ - -* ``Remove RefreshConfiguration workaround for K8s token refreshing (#20759)`` - -3.1.1 -..... - -Misc -~~~~~ - -* ``Add Trove classifiers in PyPI (Framework :: Apache Airflow :: Provider)`` - -3.1.0 -..... - -Features -~~~~~~~~ - -* ``Add map_index label to mapped KubernetesPodOperator (#21916)`` -* ``Change KubePodOperator labels from exeuction_date to run_id (#21960)`` - -Misc -~~~~ - -* ``Support for Python 3.10`` -* ``Fix Kubernetes example with wrong operator casing (#21898)`` -* ``Remove types from KPO docstring (#21826)`` - -.. Below changes are excluded from the changelog. Move them to - appropriate section above if needed. Do not delete the lines(!): - * ``Add pre-commit check for docstring param types (#21398)`` - -3.0.2 -..... - -Bug Fixes -~~~~~~~~~ - -* ``Add missed deprecations for cncf (#20031)`` - -.. Below changes are excluded from the changelog. Move them to - appropriate section above if needed. Do not delete the lines(!): - * ``Remove ':type' lines now sphinx-autoapi supports typehints (#20951)`` - * ``Make ''delete_pod'' change more prominent in K8s changelog (#20753)`` - * ``Fix MyPy Errors for providers: Tableau, CNCF, Apache (#20654)`` - * ``Add optional features in providers. (#21074)`` - * ``Add documentation for January 2021 providers release (#21257)`` - -3.0.1 -..... - - -Misc -~~~~ - -* ``Update Kubernetes library version (#18797)`` - -.. Below changes are excluded from the changelog. Move them to - appropriate section above if needed. Do not delete the lines(!): - -3.0.0 -..... - -Breaking changes -~~~~~~~~~~~~~~~~ - -* ``Parameter is_delete_operator_pod default is changed to True (#20575)`` -* ``Simplify KubernetesPodOperator (#19572)`` -* ``Move pod_mutation_hook call from PodManager to KubernetesPodOperator (#20596)`` -* ``Rename ''PodLauncher'' to ''PodManager'' (#20576)`` - -Parameter is_delete_operator_pod has new default -```````````````````````````````````````````````` - -Previously, the default for param ``is_delete_operator_pod`` was ``False``, which means that -after a task runs, its pod is not deleted by the operator and remains on the -cluster indefinitely. With this release, we change the default to ``True``. - -Notes on changes KubernetesPodOperator and PodLauncher -`````````````````````````````````````````````````````` - -.. warning:: Many methods in ``KubernetesPodOperator`` and ``PodLauncher`` have been renamed. - If you have subclassed ``KubernetesPodOperator`` you will need to update your subclass to reflect - the new structure. Additionally ``PodStatus`` enum has been renamed to ``PodPhase``. - -Overview -'''''''' - -Generally speaking if you did not subclass ``KubernetesPodOperator`` and you didn't use the ``PodLauncher`` class directly, -then you don't need to worry about this change. If however you have subclassed ``KubernetesPodOperator``, what -follows are some notes on the changes in this release. - -One of the principal goals of the refactor is to clearly separate the "get or create pod" and -"wait for pod completion" phases. Previously the "wait for pod completion" logic would be invoked -differently depending on whether the operator were to "attach to an existing pod" (e.g. after a -worker failure) or "create a new pod" and this resulted in some code duplication and a bit more -nesting of logic. With this refactor we encapsulate the "get or create" step -into method ``KubernetesPodOperator.get_or_create_pod``, and pull the monitoring and XCom logic up -into the top level of ``execute`` because it can be the same for "attached" pods and "new" pods. - -The ``KubernetesPodOperator.get_or_create_pod`` tries first to find an existing pod using labels -specific to the task instance (see ``KubernetesPodOperator.find_pod``). -If one does not exist it ``creates a pod <~.PodManager.create_pod>``. - -The "waiting" part of execution has three components. The first step is to wait for the pod to leave the -``Pending`` phase (``~.KubernetesPodOperator.await_pod_start``). Next, if configured to do so, -the operator will follow the base container logs and forward these logs to the task logger until -the ``base`` container is done. If not configured to harvest the -logs, the operator will instead ``KubernetesPodOperator.await_container_completion`` -either way, we must await container completion before harvesting xcom. After (optionally) extracting the xcom -value from the base container, we ``await pod completion <~.PodManager.await_pod_completion>``. - -Previously, depending on whether the pod was "reattached to" (e.g. after a worker failure) or -created anew, the waiting logic may have occurred in either ``handle_pod_overlap`` or ``create_new_pod_for_operator``. - -After the pod terminates, we execute different cleanup tasks depending on whether the pod terminated successfully. - -If the pod terminates *unsuccessfully*, we attempt to log the pod events ``PodLauncher.read_pod_events>``. If -additionally the task is configured *not* to delete the pod after termination, we apply a label ``KubernetesPodOperator.patch_already_checked>`` -indicating that the pod failed and should not be "reattached to" in a retry. If the task is configured -to delete its pod, we delete it ``KubernetesPodOperator.process_pod_deletion>``. Finally, -we raise an AirflowException to fail the task instance. - -If the pod terminates successfully, we delete the pod ``KubernetesPodOperator.process_pod_deletion>`` -(if configured to delete the pod) and push XCom (if configured to push XCom). - -Details on method renames, refactors, and deletions -''''''''''''''''''''''''''''''''''''''''''''''''''' - -In ``KubernetesPodOperator``: - -* Method ``create_pod_launcher`` is converted to cached property ``pod_manager`` -* Construction of k8s ``CoreV1Api`` client is now encapsulated within cached property ``client`` -* Logic to search for an existing pod (e.g. after an airflow worker failure) is moved out of ``execute`` and into method ``find_pod``. -* Method ``handle_pod_overlap`` is removed. Previously it monitored a "found" pod until completion. With this change the pod monitoring (and log following) is orchestrated directly from ``execute`` and it is the same whether it's a "found" pod or a "new" pod. See methods ``await_pod_start``, ``follow_container_logs``, ``await_container_completion`` and ``await_pod_completion``. -* Method ``create_pod_request_obj`` is renamed ``build_pod_request_obj``. It now takes argument ``context`` in order to add TI-specific pod labels; previously they were added after return. -* Method ``create_labels_for_pod`` is renamed ``_get_ti_pod_labels``. This method doesn't return *all* labels, but only those specific to the TI. We also add parameter ``include_try_number`` to control the inclusion of this label instead of possibly filtering it out later. -* Method ``_get_pod_identifying_label_string`` is renamed ``_build_find_pod_label_selector`` -* Method ``_try_numbers_match`` is removed. -* Method ``create_new_pod_for_operator`` is removed. Previously it would mutate the labels on ``self.pod``, launch the pod, monitor the pod to completion etc. Now this logic is in part handled by ``get_or_create_pod``, where a new pod will be created if necessary. The monitoring etc is now orchestrated directly from ``execute``. Again, see the calls to methods ``await_pod_start``, ``follow_container_logs``, ``await_container_completion`` and ``await_pod_completion``. - -In class ``PodManager`` (formerly ``PodLauncher``): - -* Method ``start_pod`` is removed and split into two methods: ``create_pod`` and ``await_pod_start``. -* Method ``monitor_pod`` is removed and split into methods ``follow_container_logs``, ``await_container_completion``, ``await_pod_completion`` -* Methods ``pod_not_started``, ``pod_is_running``, ``process_status``, and ``_task_status`` are removed. These were needed due to the way in which pod ``phase`` was mapped to task instance states; but we no longer do such a mapping and instead deal with pod phases directly and untransformed. -* Method ``_extract_xcom`` is renamed ``extract_xcom``. -* Method ``read_pod_logs`` now takes kwarg ``container_name`` - - -Other changes in ``pod_manager.py`` (formerly ``pod_launcher.py``): - -* Class ``pod_launcher.PodLauncher`` renamed to ``pod_manager.PodManager`` -* Enum-like class ``PodStatus`` is renamed ``PodPhase``, and the values are no longer lower-cased. -* The ``airflow.settings.pod_mutation_hook`` is no longer called in - ``cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async``. For ``KubernetesPodOperator``, - mutation now occurs in ``build_pod_request_obj``. -* Parameter ``is_delete_operator_pod`` default is changed to ``True`` so that pods are deleted after task - completion and not left to accumulate. In practice it seems more common to disable pod deletion only on a - temporary basis for debugging purposes and therefore pod deletion is the more sensible default. - -Features -~~~~~~~~ - -* ``Add params config, in_cluster, and cluster_context to KubernetesHook (#19695)`` -* ``Implement dry_run for KubernetesPodOperator (#20573)`` -* ``Clarify docstring for ''build_pod_request_obj'' in K8s providers (#20574)`` - -Bug Fixes -~~~~~~~~~ - -* ``Fix Volume/VolumeMount KPO DeprecationWarning (#19726)`` - -.. Below changes are excluded from the changelog. Move them to - appropriate section above if needed. Do not delete the lines(!): - * ``Fix cached_property MyPy declaration and related MyPy errors (#20226)`` - * ``Use typed Context EVERYWHERE (#20565)`` - * ``Fix template_fields type to have MyPy friendly Sequence type (#20571)`` - * ``Even more typing in operators (template_fields/ext) (#20608)`` - * ``Update documentation for provider December 2021 release (#20523)`` - -2.2.0 -..... - -Features -~~~~~~~~ - -* ``Added namespace as a template field in the KPO. (#19718)`` -* ``Decouple name randomization from name kwarg (#19398)`` - -Bug Fixes -~~~~~~~~~ - -* ``Checking event.status.container_statuses before filtering (#19713)`` -* ``Coalesce 'extra' params to None in KubernetesHook (#19694)`` -* ``Change to correct type in KubernetesPodOperator (#19459)`` - -.. Below changes are excluded from the changelog. Move them to - appropriate section above if needed. Do not delete the lines(!): - * ``Fix duplicate changelog entries (#19759)`` - -2.1.0 -..... - -Features -~~~~~~~~ - -* ``Add more type hints to PodLauncher (#18928)`` -* ``Add more information to PodLauncher timeout error (#17953)`` - -.. Below changes are excluded from the changelog. Move them to - appropriate section above if needed. Do not delete the lines(!): - * ``Update docstring to let users use 'node_selector' (#19057)`` - * ``Add pre-commit hook for common misspelling check in files (#18964)`` - 2.0.3 ..... @@ -269,8 +44,7 @@ Bug Fixes * ``Fix using XCom with ''KubernetesPodOperator'' (#17760)`` * ``Import Hooks lazily individually in providers manager (#17682)`` -.. Below changes are excluded from the changelog. Move them to - appropriate section above if needed. Do not delete the lines(!): +.. Review and move the new changes to one of the sections above: * ``Fix messed-up changelog in 3 providers (#17380)`` * ``Fix static checks (#17256)`` * ``Update spark_kubernetes.py (#17237)`` @@ -291,7 +65,10 @@ Bug Fixes .. Below changes are excluded from the changelog. Move them to appropriate section above if needed. Do not delete the lines(!): + * ``Fixed wrongly escaped characters in amazon's changelog (#17020)`` * ``Simplify 'default_args' in Kubernetes example DAGs (#16870)`` + * ``Enable using custom pod launcher in Kubernetes Pod Operator (#16945)`` + * ``Prepare documentation for July release of providers. (#17015)`` * ``Updating task dependencies (#16624)`` * ``Removes pylint from our toolchain (#16682)`` * ``Prepare documentation for July release of providers. (#17015)`` diff --git a/airflow/providers/cncf/kubernetes/__init__.py b/airflow/providers/cncf/kubernetes/__init__.py index 0998e31143fc8..217e5db960782 100644 --- a/airflow/providers/cncf/kubernetes/__init__.py +++ b/airflow/providers/cncf/kubernetes/__init__.py @@ -15,30 +15,3 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -import sys - -if sys.version_info < (3, 7): - # This is needed because the Python Kubernetes client >= 12.0 contains a logging object, meaning that - # v1.Pod et al. are not pickleable on Python 3.6. - - # Python 3.7 added this via https://bugs.python.org/issue30520 in 2017 -- but Python 3.6 doesn't have this - # method. - - # This is duplicated/backported from airflow.logging_config in 2.2, but by having it here as well it means - # that we can update the version used in this provider and have it work for older versions - import copyreg - import logging - - def _reduce_Logger(logger): - if logging.getLogger(logger.name) is not logger: - import pickle - - raise pickle.PicklingError('logger cannot be pickled') - return logging.getLogger, (logger.name,) - - def _reduce_RootLogger(logger): - return logging.getLogger, () - - if logging.Logger not in copyreg.dispatch_table: - copyreg.pickle(logging.Logger, _reduce_Logger) - copyreg.pickle(logging.RootLogger, _reduce_RootLogger) diff --git a/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py b/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py index bf2b8329f8ccd..4c6404f054832 100644 --- a/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py +++ b/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py @@ -21,16 +21,18 @@ from kubernetes.client import ApiClient, models as k8s from airflow.exceptions import AirflowException +from airflow.providers.cncf.kubernetes.backcompat.pod import Port, Resources +from airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env import PodRuntimeInfoEnv -def _convert_kube_model_object(obj, new_class): +def _convert_kube_model_object(obj, old_class, new_class): convert_op = getattr(obj, "to_k8s_client_obj", None) if callable(convert_op): return obj.to_k8s_client_obj() elif isinstance(obj, new_class): return obj else: - raise AirflowException(f"Expected {new_class}, got {type(obj)}") + raise AirflowException(f"Expected {old_class} or {new_class}, got {type(obj)}") def _convert_from_dict(obj, new_class): @@ -50,7 +52,9 @@ def convert_volume(volume) -> k8s.V1Volume: :param volume: :return: k8s.V1Volume """ - return _convert_kube_model_object(volume, k8s.V1Volume) + from airflow.providers.cncf.kubernetes.backcompat.volume import Volume + + return _convert_kube_model_object(volume, Volume, k8s.V1Volume) def convert_volume_mount(volume_mount) -> k8s.V1VolumeMount: @@ -60,7 +64,9 @@ def convert_volume_mount(volume_mount) -> k8s.V1VolumeMount: :param volume_mount: :return: k8s.V1VolumeMount """ - return _convert_kube_model_object(volume_mount, k8s.V1VolumeMount) + from airflow.providers.cncf.kubernetes.backcompat.volume_mount import VolumeMount + + return _convert_kube_model_object(volume_mount, VolumeMount, k8s.V1VolumeMount) def convert_resources(resources) -> k8s.V1ResourceRequirements: @@ -71,10 +77,8 @@ def convert_resources(resources) -> k8s.V1ResourceRequirements: :return: k8s.V1ResourceRequirements """ if isinstance(resources, dict): - from airflow.providers.cncf.kubernetes.backcompat.pod import Resources - resources = Resources(**resources) - return _convert_kube_model_object(resources, k8s.V1ResourceRequirements) + return _convert_kube_model_object(resources, Resources, k8s.V1ResourceRequirements) def convert_port(port) -> k8s.V1ContainerPort: @@ -84,7 +88,7 @@ def convert_port(port) -> k8s.V1ContainerPort: :param port: :return: k8s.V1ContainerPort """ - return _convert_kube_model_object(port, k8s.V1ContainerPort) + return _convert_kube_model_object(port, Port, k8s.V1ContainerPort) def convert_env_vars(env_vars) -> List[k8s.V1EnvVar]: @@ -112,7 +116,7 @@ def convert_pod_runtime_info_env(pod_runtime_info_envs) -> k8s.V1EnvVar: :param pod_runtime_info_envs: :return: """ - return _convert_kube_model_object(pod_runtime_info_envs, k8s.V1EnvVar) + return _convert_kube_model_object(pod_runtime_info_envs, PodRuntimeInfoEnv, k8s.V1EnvVar) def convert_image_pull_secrets(image_pull_secrets) -> List[k8s.V1LocalObjectReference]: diff --git a/airflow/providers/cncf/kubernetes/backcompat/pod.py b/airflow/providers/cncf/kubernetes/backcompat/pod.py index 7f18117e18bfa..30a7128b399e8 100644 --- a/airflow/providers/cncf/kubernetes/backcompat/pod.py +++ b/airflow/providers/cncf/kubernetes/backcompat/pod.py @@ -14,29 +14,13 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -""" -Classes for interacting with Kubernetes API. - -This module is deprecated. Please use :mod:`kubernetes.client.models.V1ResourceRequirements` -and :mod:`kubernetes.client.models.V1ContainerPort`. -""" - -import warnings +"""Classes for interacting with Kubernetes API""" from kubernetes.client import models as k8s -warnings.warn( - ( - "This module is deprecated. Please use `kubernetes.client.models.V1ResourceRequirements`" - " and `kubernetes.client.models.V1ContainerPort`." - ), - DeprecationWarning, - stacklevel=2, -) - class Resources: - """backwards compat for Resources.""" + """backwards compat for Resources""" __slots__ = ( 'request_memory', @@ -50,12 +34,19 @@ class Resources: """ :param request_memory: requested memory + :type request_memory: str :param request_cpu: requested CPU number + :type request_cpu: float | str :param request_ephemeral_storage: requested ephemeral storage + :type request_ephemeral_storage: str :param limit_memory: limit for memory usage + :type limit_memory: str :param limit_cpu: Limit for CPU used + :type limit_cpu: float | str :param limit_gpu: Limits for GPU used + :type limit_gpu: int :param limit_ephemeral_storage: Limit for ephemeral storage + :type limit_ephemeral_storage: float | str """ def __init__( diff --git a/airflow/providers/cncf/kubernetes/backcompat/pod_runtime_info_env.py b/airflow/providers/cncf/kubernetes/backcompat/pod_runtime_info_env.py index f08aecff33a89..f76e0d7d0bb21 100644 --- a/airflow/providers/cncf/kubernetes/backcompat/pod_runtime_info_env.py +++ b/airflow/providers/cncf/kubernetes/backcompat/pod_runtime_info_env.py @@ -14,25 +14,13 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -""" -Classes for interacting with Kubernetes API. - -This module is deprecated. Please use :mod:`kubernetes.client.models.V1EnvVar`. -""" - -import warnings +"""Classes for interacting with Kubernetes API""" import kubernetes.client.models as k8s -warnings.warn( - "This module is deprecated. Please use `kubernetes.client.models.V1EnvVar`.", - DeprecationWarning, - stacklevel=2, -) - class PodRuntimeInfoEnv: - """Defines Pod runtime information as environment variable.""" + """Defines Pod runtime information as environment variable""" def __init__(self, name, field_path): """ @@ -40,7 +28,9 @@ def __init__(self, name, field_path): Full list of options can be found in kubernetes documentation. :param name: the name of the environment variable + :type: name: str :param field_path: path to pod runtime info. Ex: metadata.namespace | status.podIP + :type: field_path: str """ self.name = name self.field_path = field_path diff --git a/airflow/providers/cncf/kubernetes/backcompat/volume.py b/airflow/providers/cncf/kubernetes/backcompat/volume.py index c51ce8a551e38..e5b4d004ed0ca 100644 --- a/airflow/providers/cncf/kubernetes/backcompat/volume.py +++ b/airflow/providers/cncf/kubernetes/backcompat/volume.py @@ -35,8 +35,10 @@ def __init__(self, name, configs): and Persistent Volumes :param name: the name of the volume mount + :type name: str :param configs: dictionary of any features needed for volume. We purposely keep this vague since there are multiple volume types with changing configs. + :type configs: dict """ self.name = name self.configs = configs diff --git a/airflow/providers/cncf/kubernetes/backcompat/volume_mount.py b/airflow/providers/cncf/kubernetes/backcompat/volume_mount.py index f9faed9d04a97..b77ab47cd8a3a 100644 --- a/airflow/providers/cncf/kubernetes/backcompat/volume_mount.py +++ b/airflow/providers/cncf/kubernetes/backcompat/volume_mount.py @@ -38,9 +38,13 @@ def __init__(self, name, mount_path, sub_path, read_only): running container. :param name: the name of the volume mount + :type name: str :param mount_path: + :type mount_path: str :param sub_path: subpath within the volume mount + :type sub_path: Optional[str] :param read_only: whether to access pod with read-only mode + :type read_only: bool """ self.name = name self.mount_path = mount_path diff --git a/airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py b/airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py deleted file mode 100644 index b65dae9f4e52a..0000000000000 --- a/airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py +++ /dev/null @@ -1,163 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -""" -This is an example dag for using the KubernetesPodOperator. -""" - -from datetime import datetime - -from kubernetes.client import models as k8s - -from airflow import DAG -from airflow.kubernetes.secret import Secret -from airflow.operators.bash import BashOperator -from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator - -# [START howto_operator_k8s_cluster_resources] -secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn') -secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn') -secret_all_keys = Secret('env', None, 'airflow-secrets-2') -volume_mount = k8s.V1VolumeMount( - name='test-volume', mount_path='/root/mount_file', sub_path=None, read_only=True -) - -configmaps = [ - k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='test-configmap-1')), - k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='test-configmap-2')), -] - -volume = k8s.V1Volume( - name='test-volume', - persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'), -) - -port = k8s.V1ContainerPort(name='http', container_port=80) - -init_container_volume_mounts = [ - k8s.V1VolumeMount(mount_path='/etc/foo', name='test-volume', sub_path=None, read_only=True) -] - -init_environments = [k8s.V1EnvVar(name='key1', value='value1'), k8s.V1EnvVar(name='key2', value='value2')] - -init_container = k8s.V1Container( - name="init-container", - image="ubuntu:16.04", - env=init_environments, - volume_mounts=init_container_volume_mounts, - command=["bash", "-cx"], - args=["echo 10"], -) - -affinity = k8s.V1Affinity( - node_affinity=k8s.V1NodeAffinity( - preferred_during_scheduling_ignored_during_execution=[ - k8s.V1PreferredSchedulingTerm( - weight=1, - preference=k8s.V1NodeSelectorTerm( - match_expressions=[ - k8s.V1NodeSelectorRequirement(key="disktype", operator="In", values=["ssd"]) - ] - ), - ) - ] - ), - pod_affinity=k8s.V1PodAffinity( - required_during_scheduling_ignored_during_execution=[ - k8s.V1WeightedPodAffinityTerm( - weight=1, - pod_affinity_term=k8s.V1PodAffinityTerm( - label_selector=k8s.V1LabelSelector( - match_expressions=[ - k8s.V1LabelSelectorRequirement(key="security", operator="In", values="S1") - ] - ), - topology_key="failure-domain.beta.kubernetes.io/zone", - ), - ) - ] - ), -) - -tolerations = [k8s.V1Toleration(key="key", operator="Equal", value="value")] - -# [END howto_operator_k8s_cluster_resources] - - -with DAG( - dag_id='example_kubernetes_operator', - schedule_interval=None, - start_date=datetime(2021, 1, 1), - tags=['example'], -) as dag: - k = KubernetesPodOperator( - namespace='default', - image="ubuntu:16.04", - cmds=["bash", "-cx"], - arguments=["echo", "10"], - labels={"foo": "bar"}, - secrets=[secret_file, secret_env, secret_all_keys], - ports=[port], - volumes=[volume], - volume_mounts=[volume_mount], - env_from=configmaps, - name="airflow-test-pod", - task_id="task", - affinity=affinity, - is_delete_operator_pod=True, - hostnetwork=False, - tolerations=tolerations, - init_containers=[init_container], - priority_class_name="medium", - ) - - # [START howto_operator_k8s_private_image] - quay_k8s = KubernetesPodOperator( - namespace='default', - image='quay.io/apache/bash', - image_pull_secrets=[k8s.V1LocalObjectReference('testquay')], - cmds=["bash", "-cx"], - arguments=["echo", "10", "echo pwd"], - labels={"foo": "bar"}, - name="airflow-private-image-pod", - is_delete_operator_pod=True, - in_cluster=True, - task_id="task-two", - get_logs=True, - ) - # [END howto_operator_k8s_private_image] - - # [START howto_operator_k8s_write_xcom] - write_xcom = KubernetesPodOperator( - namespace='default', - image='alpine', - cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"], - name="write-xcom", - do_xcom_push=True, - is_delete_operator_pod=True, - in_cluster=True, - task_id="write-xcom", - get_logs=True, - ) - - pod_task_xcom_result = BashOperator( - bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"", - task_id="pod_task_xcom_result", - ) - # [END howto_operator_k8s_write_xcom] - - write_xcom >> pod_task_xcom_result diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index 38305031dd022..e230dba34b017 100644 --- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -14,21 +14,19 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -import sys import tempfile from typing import Any, Dict, Generator, Optional, Tuple, Union -if sys.version_info >= (3, 8): +try: from functools import cached_property -else: +except ImportError: from cached_property import cached_property - from kubernetes import client, config, watch try: import airflow.utils.yaml as yaml except ImportError: - import yaml # type: ignore[no-redef] + import yaml from airflow.exceptions import AirflowException from airflow.hooks.base import BaseHook @@ -61,6 +59,7 @@ class KubernetesHook(BaseHook): :param conn_id: The :ref:`kubernetes connection ` to Kubernetes cluster. + :type conn_id: str """ conn_name_attr = 'kubernetes_conn_id' @@ -86,13 +85,10 @@ def get_connection_form_widgets() -> Dict[str, Any]: "extra__kubernetes__namespace": StringField( lazy_gettext('Namespace'), widget=BS3TextFieldWidget() ), - "extra__kubernetes__cluster_context": StringField( - lazy_gettext('Cluster context'), widget=BS3TextFieldWidget() - ), } @staticmethod - def get_ui_field_behaviour() -> Dict[str, Any]: + def get_ui_field_behaviour() -> Dict: """Returns custom field behaviour""" return { "hidden_fields": ['host', 'schema', 'login', 'password', 'port', 'extra'], @@ -100,49 +96,25 @@ def get_ui_field_behaviour() -> Dict[str, Any]: } def __init__( - self, - conn_id: Optional[str] = default_conn_name, - client_configuration: Optional[client.Configuration] = None, - cluster_context: Optional[str] = None, - config_file: Optional[str] = None, - in_cluster: Optional[bool] = None, + self, conn_id: str = default_conn_name, client_configuration: Optional[client.Configuration] = None ) -> None: super().__init__() self.conn_id = conn_id self.client_configuration = client_configuration - self.cluster_context = cluster_context - self.config_file = config_file - self.in_cluster = in_cluster - - @staticmethod - def _coalesce_param(*params): - for param in params: - if param is not None: - return param def get_conn(self) -> Any: """Returns kubernetes api session for use with requests""" - if self.conn_id: - connection = self.get_connection(self.conn_id) - extras = connection.extra_dejson - else: - extras = {} - in_cluster = self._coalesce_param( - self.in_cluster, extras.get("extra__kubernetes__in_cluster") or None - ) - cluster_context = self._coalesce_param( - self.cluster_context, extras.get("extra__kubernetes__cluster_context") or None - ) - kubeconfig_path = self._coalesce_param( - self.config_file, extras.get("extra__kubernetes__kube_config_path") or None - ) - kubeconfig = extras.get("extra__kubernetes__kube_config") or None + connection = self.get_connection(self.conn_id) + extras = connection.extra_dejson + in_cluster = extras.get("extra__kubernetes__in_cluster") + kubeconfig_path = extras.get("extra__kubernetes__kube_config_path") + kubeconfig = extras.get("extra__kubernetes__kube_config") num_selected_configuration = len([o for o in [in_cluster, kubeconfig, kubeconfig_path] if o]) if num_selected_configuration > 1: raise AirflowException( - "Invalid connection configuration. Options kube_config_path, " - "kube_config, in_cluster are mutually exclusive. " + "Invalid connection configuration. Options extra__kubernetes__kube_config_path, " + "extra__kubernetes__kube_config, extra__kubernetes__in_cluster are mutually exclusive. " "You can only use one option at a time." ) if in_cluster: @@ -153,9 +125,7 @@ def get_conn(self) -> Any: if kubeconfig_path is not None: self.log.debug("loading kube_config from: %s", kubeconfig_path) config.load_kube_config( - config_file=kubeconfig_path, - client_configuration=self.client_configuration, - context=cluster_context, + config_file=kubeconfig_path, client_configuration=self.client_configuration ) return client.ApiClient() @@ -165,17 +135,12 @@ def get_conn(self) -> Any: temp_config.write(kubeconfig.encode()) temp_config.flush() config.load_kube_config( - config_file=temp_config.name, - client_configuration=self.client_configuration, - context=cluster_context, + config_file=temp_config.name, client_configuration=self.client_configuration ) return client.ApiClient() self.log.debug("loading kube_config from: default file") - config.load_kube_config( - client_configuration=self.client_configuration, - context=cluster_context, - ) + config.load_kube_config(client_configuration=self.client_configuration) return client.ApiClient() @cached_property @@ -183,10 +148,6 @@ def api_client(self) -> Any: """Cached Kubernetes API client""" return self.get_conn() - @cached_property - def core_v1_client(self): - return client.CoreV1Api(api_client=self.api_client) - def create_custom_object( self, group: str, version: str, plural: str, body: Union[str, dict], namespace: Optional[str] = None ): @@ -194,10 +155,15 @@ def create_custom_object( Creates custom resource definition object in Kubernetes :param group: api group + :type group: str :param version: api version + :type version: str :param plural: api plural + :type plural: str :param body: crd object definition + :type body: Union[str, dict] :param namespace: kubernetes namespace + :type namespace: str """ api = client.CustomObjectsApi(self.api_client) if namespace is None: @@ -220,10 +186,15 @@ def get_custom_object( Get custom resource definition object from Kubernetes :param group: api group + :type group: str :param version: api version + :type version: str :param plural: api plural + :type plural: str :param name: crd object name + :type name: str :param namespace: kubernetes namespace + :type namespace: str """ api = client.CustomObjectsApi(self.api_client) if namespace is None: @@ -236,14 +207,12 @@ def get_custom_object( except client.rest.ApiException as e: raise AirflowException(f"Exception when calling -> get_custom_object: {e}\n") - def get_namespace(self) -> Optional[str]: + def get_namespace(self) -> str: """Returns the namespace that defined in the connection""" - if self.conn_id: - connection = self.get_connection(self.conn_id) - extras = connection.extra_dejson - namespace = extras.get("extra__kubernetes__namespace", "default") - return namespace - return None + connection = self.get_connection(self.conn_id) + extras = connection.extra_dejson + namespace = extras.get("extra__kubernetes__namespace", "default") + return namespace def get_pod_log_stream( self, @@ -255,8 +224,10 @@ def get_pod_log_stream( Retrieves a log stream for a container in a kubernetes pod. :param pod_name: pod name + :type pod_name: str :param container: container name :param namespace: kubernetes namespace + :type namespace: str """ api = client.CoreV1Api(self.api_client) watcher = watch.Watch() @@ -280,8 +251,10 @@ def get_pod_logs( Retrieves a container's log from the specified pod. :param pod_name: pod name + :type pod_name: str :param container: container name :param namespace: kubernetes namespace + :type namespace: str """ api = client.CoreV1Api(self.api_client) return api.read_namespaced_pod_log( diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index dd127fee76ee1..747f8b024e667 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -15,16 +15,17 @@ # specific language governing permissions and limitations # under the License. """Executes task in a Kubernetes POD""" -import json -import logging import re -import sys import warnings -from contextlib import AbstractContextManager -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Type from kubernetes.client import CoreV1Api, models as k8s +try: + import airflow.utils.yaml as yaml +except ImportError: + import yaml + from airflow.exceptions import AirflowException from airflow.kubernetes import kube_client, pod_generator from airflow.kubernetes.pod_generator import PodGenerator @@ -42,27 +43,15 @@ convert_volume, convert_volume_mount, ) -from airflow.providers.cncf.kubernetes.utils import xcom_sidecar -from airflow.providers.cncf.kubernetes.utils.pod_manager import PodLaunchFailedException, PodManager, PodPhase -from airflow.settings import pod_mutation_hook -from airflow.utils import yaml +from airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env import PodRuntimeInfoEnv +from airflow.providers.cncf.kubernetes.utils import pod_launcher, xcom_sidecar from airflow.utils.helpers import validate_key +from airflow.utils.state import State from airflow.version import version as airflow_version -if sys.version_info >= (3, 8): - from functools import cached_property -else: - from cached_property import cached_property - if TYPE_CHECKING: import jinja2 - from airflow.utils.context import Context - - -class PodReattachFailure(AirflowException): - """When we expect to be able to find a pod but cannot.""" - class KubernetesPodOperator(BaseOperator): """ @@ -79,66 +68,101 @@ class KubernetesPodOperator(BaseOperator): simplifies the authorization process. :param namespace: the namespace to run within kubernetes. + :type namespace: str :param image: Docker image you wish to launch. Defaults to hub.docker.com, but fully qualified URLS will point to custom repositories. (templated) + :type image: str :param name: name of the pod in which the task will run, will be used (plus a random - suffix if random_name_suffix is True) to generate a pod id (DNS-1123 subdomain, - containing only [a-z0-9.-]). - :param random_name_suffix: if True, will generate a random suffix. + suffix) to generate a pod id (DNS-1123 subdomain, containing only [a-z0-9.-]). + :type name: str :param cmds: entrypoint of the container. (templated) The docker images's entrypoint is used if this is not provided. + :type cmds: list[str] :param arguments: arguments of the entrypoint. (templated) The docker image's CMD is used if this is not provided. - :param ports: ports for the launched pod. - :param volume_mounts: volumeMounts for the launched pod. - :param volumes: volumes for the launched pod. Includes ConfigMaps and PersistentVolumes. + :type arguments: list[str] + :param ports: ports for launched pod. + :type ports: list[k8s.V1ContainerPort] + :param volume_mounts: volumeMounts for launched pod. + :type volume_mounts: list[k8s.V1VolumeMount] + :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes. + :type volumes: list[k8s.V1Volume] :param env_vars: Environment variables initialized in the container. (templated) + :type env_vars: list[k8s.V1EnvVar] :param secrets: Kubernetes secrets to inject in the container. They can be exposed as environment vars or files in a volume. + :type secrets: list[airflow.kubernetes.secret.Secret] :param in_cluster: run kubernetes client with in_cluster configuration. + :type in_cluster: bool :param cluster_context: context that points to kubernetes cluster. Ignored when in_cluster is True. If None, current-context is used. + :type cluster_context: str :param reattach_on_restart: if the scheduler dies while the pod is running, reattach and monitor + :type reattach_on_restart: bool :param labels: labels to apply to the Pod. (templated) + :type labels: dict :param startup_timeout_seconds: timeout in seconds to startup the pod. + :type startup_timeout_seconds: int :param get_logs: get the stdout of the container as logs of the tasks. + :type get_logs: bool :param image_pull_policy: Specify a policy to cache or always pull an image. + :type image_pull_policy: str :param annotations: non-identifying metadata you can attach to the Pod. Can be a large range of data, and can include characters that are not permitted by labels. - :param resources: resources for the launched pod. - :param affinity: affinity scheduling rules for the launched pod. + :type annotations: dict + :param resources: A dict containing resources requests and limits. + Possible keys are request_memory, request_cpu, limit_memory, limit_cpu, + and limit_gpu, which will be used to generate airflow.kubernetes.pod.Resources. + See also kubernetes.io/docs/concepts/configuration/manage-compute-resources-container + :type resources: k8s.V1ResourceRequirements + :param affinity: A dict containing a group of affinity scheduling rules. + :type affinity: k8s.V1Affinity :param config_file: The path to the Kubernetes config file. (templated) If not specified, default value is ``~/.kube/config`` - :param node_selector: A dict containing a group of scheduling rules. + :type config_file: str + :param node_selectors: A dict containing a group of scheduling rules. + :type node_selectors: dict :param image_pull_secrets: Any image pull secrets to be given to the pod. If more than one secret is required, provide a comma separated list: secret_a,secret_b + :type image_pull_secrets: List[k8s.V1LocalObjectReference] :param service_account_name: Name of the service account + :type service_account_name: str :param is_delete_operator_pod: What to do when the pod reaches its final - state, or the execution is interrupted. If True (default), delete the - pod; if False, leave the pod. + state, or the execution is interrupted. + If False (default): do nothing, If True: delete the pod + :type is_delete_operator_pod: bool :param hostnetwork: If True enable host networking on the pod. + :type hostnetwork: bool :param tolerations: A list of kubernetes tolerations. + :type tolerations: List[k8s.V1Toleration] :param security_context: security options the pod should run with (PodSecurityContext). + :type security_context: dict :param dnspolicy: dnspolicy for the pod. + :type dnspolicy: str :param schedulername: Specify a schedulername for the pod + :type schedulername: str :param full_pod_spec: The complete podSpec + :type full_pod_spec: kubernetes.client.models.V1Pod :param init_containers: init container for the launched Pod + :type init_containers: list[kubernetes.client.models.V1Container] :param log_events_on_failure: Log the pod's events if a failure occurs + :type log_events_on_failure: bool :param do_xcom_push: If True, the content of the file /airflow/xcom/return.json in the container will also be pushed to an XCom when the container completes. + :type do_xcom_push: bool :param pod_template_file: path to pod template file (templated) + :type pod_template_file: str :param priority_class_name: priority class name for the launched Pod + :type priority_class_name: str :param termination_grace_period: Termination grace period if task killed in UI, defaults to kubernetes default + :type termination_grace_period: int """ - BASE_CONTAINER_NAME = 'base' - POD_CHECKED_KEY = 'already_checked' - - template_fields: Sequence[str] = ( + template_fields: Iterable[str] = ( 'image', 'cmds', 'arguments', @@ -146,16 +170,16 @@ class KubernetesPodOperator(BaseOperator): 'labels', 'config_file', 'pod_template_file', - 'namespace', ) + # fmt: off def __init__( + # fmt: on self, *, namespace: Optional[str] = None, image: Optional[str] = None, name: Optional[str] = None, - random_name_suffix: Optional[bool] = True, cmds: Optional[List[str]] = None, arguments: Optional[List[str]] = None, ports: Optional[List[k8s.V1ContainerPort]] = None, @@ -179,7 +203,7 @@ def __init__( node_selector: Optional[dict] = None, image_pull_secrets: Optional[List[k8s.V1LocalObjectReference]] = None, service_account_name: Optional[str] = None, - is_delete_operator_pod: bool = True, + is_delete_operator_pod: bool = False, hostnetwork: bool = False, tolerations: Optional[List[k8s.V1Toleration]] = None, security_context: Optional[Dict] = None, @@ -191,9 +215,9 @@ def __init__( do_xcom_push: bool = False, pod_template_file: Optional[str] = None, priority_class_name: Optional[str] = None, - pod_runtime_info_envs: Optional[List[k8s.V1EnvVar]] = None, + pod_runtime_info_envs: List[PodRuntimeInfoEnv] = None, termination_grace_period: Optional[int] = None, - configmaps: Optional[List[str]] = None, + configmaps: Optional[str] = None, **kwargs, ) -> None: if kwargs.get('xcom_push') is not None: @@ -240,9 +264,8 @@ def __init__( self.service_account_name = service_account_name self.is_delete_operator_pod = is_delete_operator_pod self.hostnetwork = hostnetwork - self.tolerations = ( - [convert_toleration(toleration) for toleration in tolerations] if tolerations else [] - ) + self.tolerations = [convert_toleration(toleration) for toleration in tolerations] \ + if tolerations else [] self.security_context = security_context or {} self.dnspolicy = dnspolicy self.schedulername = schedulername @@ -252,15 +275,14 @@ def __init__( self.priority_class_name = priority_class_name self.pod_template_file = pod_template_file self.name = self._set_name(name) - self.random_name_suffix = random_name_suffix self.termination_grace_period = termination_grace_period - self.pod_request_obj: Optional[k8s.V1Pod] = None - self.pod: Optional[k8s.V1Pod] = None + self.client: CoreV1Api = None + self.pod: k8s.V1Pod = None def _render_nested_template_fields( self, content: Any, - context: 'Context', + context: Dict, jinja_env: "jinja2.Environment", seen_oids: set, ) -> None: @@ -269,31 +291,27 @@ def _render_nested_template_fields( self._do_render_template_fields(content, ('value', 'name'), context, jinja_env, seen_oids) return - super()._render_nested_template_fields(content, context, jinja_env, seen_oids) + super()._render_nested_template_fields( + content, + context, + jinja_env, + seen_oids + ) @staticmethod - def _get_ti_pod_labels(context: Optional[dict] = None, include_try_number: bool = True) -> dict: + def create_labels_for_pod(context) -> dict: """ Generate labels for the pod to track the pod in case of Operator crash :param context: task context provided by airflow DAG :return: dict """ - if not context: - return {} - - ti = context['ti'] - run_id = context['run_id'] - - labels = {'dag_id': ti.dag_id, 'task_id': ti.task_id, 'run_id': run_id} - - # If running on Airflow 2.3+: - map_index = getattr(ti, 'map_index', -1) - if map_index >= 0: - labels['map_index'] = map_index - - if include_try_number: - labels.update(try_number=ti.try_number) + labels = { + 'dag_id': context['dag'].dag_id, + 'task_id': context['task'].task_id, + 'execution_date': context['ts'], + 'try_number': context['ti'].try_number, + } # In the case of sub dags this is just useful if context['dag'].is_subdag: labels['parent_dag_id'] = context['dag'].parent_dag.dag_id @@ -304,127 +322,101 @@ def _get_ti_pod_labels(context: Optional[dict] = None, include_try_number: bool labels[label_id] = safe_label return labels - @cached_property - def pod_manager(self) -> PodManager: - return PodManager(kube_client=self.client) + def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]: + return pod_launcher.PodLauncher(kube_client=self.client, extract_xcom=self.do_xcom_push) - @cached_property - def client(self) -> CoreV1Api: - # todo: use airflow Connection / hook to authenticate to the cluster - kwargs: Dict[str, Any] = dict( - cluster_context=self.cluster_context, - config_file=self.config_file, - ) - if self.in_cluster is not None: - kwargs.update(in_cluster=self.in_cluster) - return kube_client.get_kube_client(**kwargs) - - def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]: - """Returns an already-running pod for this task instance if one exists.""" - label_selector = self._build_find_pod_label_selector(context) - pod_list = self.client.list_namespaced_pod( - namespace=namespace, - label_selector=label_selector, - ).items - - pod = None - num_pods = len(pod_list) - if num_pods > 1: - raise AirflowException(f'More than one pod running with labels {label_selector}') - elif num_pods == 1: - pod = pod_list[0] - self.log.info("Found matching pod %s with labels %s", pod.metadata.name, pod.metadata.labels) - self.log.info("`try_number` of task_instance: %s", context['ti'].try_number) - self.log.info("`try_number` of pod: %s", pod.metadata.labels['try_number']) - return pod + def execute(self, context) -> Optional[str]: + try: + if self.in_cluster is not None: + client = kube_client.get_kube_client( + in_cluster=self.in_cluster, + cluster_context=self.cluster_context, + config_file=self.config_file, + ) + else: + client = kube_client.get_kube_client( + cluster_context=self.cluster_context, config_file=self.config_file + ) - def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context): - if self.reattach_on_restart: - pod = self.find_pod(self.namespace or pod_request_obj.metadata.namespace, context=context) - if pod: - return pod - self.log.debug("Starting pod:\n%s", yaml.safe_dump(pod_request_obj.to_dict())) - self.pod_manager.create_pod(pod=pod_request_obj) - return pod_request_obj + self.client = client - def await_pod_start(self, pod): - try: - self.pod_manager.await_pod_start(pod=pod, startup_timeout=self.startup_timeout_seconds) - except PodLaunchFailedException: - if self.log_events_on_failure: - for event in self.pod_manager.read_pod_events(pod).items: - self.log.error("Pod Event: %s - %s", event.reason, event.message) - raise + self.pod = self.create_pod_request_obj() + self.namespace = self.pod.metadata.namespace - def extract_xcom(self, pod): - """Retrieves xcom value and kills xcom sidecar container""" - result = self.pod_manager.extract_xcom(pod) - self.log.info("xcom result: \n%s", result) - return json.loads(result) + # Add combination of labels to uniquely identify a running pod + labels = self.create_labels_for_pod(context) - def execute(self, context: 'Context'): - remote_pod = None - try: - self.pod_request_obj = self.build_pod_request_obj(context) - self.pod = self.get_or_create_pod( # must set `self.pod` for `on_kill` - pod_request_obj=self.pod_request_obj, - context=context, - ) - self.await_pod_start(pod=self.pod) + label_selector = self._get_pod_identifying_label_string(labels) - if self.get_logs: - self.pod_manager.fetch_container_logs( - pod=self.pod, - container_name=self.BASE_CONTAINER_NAME, - follow=True, - ) - else: - self.pod_manager.await_container_completion( - pod=self.pod, container_name=self.BASE_CONTAINER_NAME + pod_list = self.client.list_namespaced_pod(self.namespace, label_selector=label_selector) + + if len(pod_list.items) > 1 and self.reattach_on_restart: + raise AirflowException( + f'More than one pod running with labels: {label_selector}' ) - if self.do_xcom_push: - result = self.extract_xcom(pod=self.pod) - remote_pod = self.pod_manager.await_pod_completion(self.pod) - finally: - self.cleanup( - pod=self.pod or self.pod_request_obj, - remote_pod=remote_pod, - ) - ti = context['ti'] - ti.xcom_push(key='pod_name', value=self.pod.metadata.name) - ti.xcom_push(key='pod_namespace', value=self.pod.metadata.namespace) - if self.do_xcom_push: + launcher = self.create_pod_launcher() + + if len(pod_list.items) == 1: + try_numbers_match = self._try_numbers_match(context, pod_list.items[0]) + final_state, remote_pod, result = self.handle_pod_overlap( + labels, try_numbers_match, launcher, pod_list.items[0] + ) + else: + self.log.info("creating pod with labels %s and launcher %s", labels, launcher) + final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher) + if final_state != State.SUCCESS: + raise AirflowException(f'Pod {self.pod.metadata.name} returned a failure: {remote_pod}') + context['task_instance'].xcom_push(key='pod_name', value=self.pod.metadata.name) + context['task_instance'].xcom_push(key='pod_namespace', value=self.namespace) return result + except AirflowException as ex: + raise AirflowException(f'Pod Launching failed: {ex}') - def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): - pod_phase = remote_pod.status.phase if hasattr(remote_pod, 'status') else None - if pod_phase != PodPhase.SUCCEEDED: - if self.log_events_on_failure: - with _suppress(Exception): - for event in self.pod_manager.read_pod_events(pod).items: - self.log.error("Pod Event: %s - %s", event.reason, event.message) - if not self.is_delete_operator_pod: - with _suppress(Exception): - self.patch_already_checked(pod) - with _suppress(Exception): - self.process_pod_deletion(pod) - raise AirflowException(f'Pod {pod and pod.metadata.name} returned a failure: {remote_pod}') - else: - with _suppress(Exception): - self.process_pod_deletion(pod) + def handle_pod_overlap( + self, labels: dict, try_numbers_match: bool, launcher: Any, pod: k8s.V1Pod + ) -> Tuple[State, k8s.V1Pod, Optional[str]]: + """ + + In cases where the Scheduler restarts while a KubernetesPodOperator task is running, + this function will either continue to monitor the existing pod or launch a new pod + based on the `reattach_on_restart` parameter. - def process_pod_deletion(self, pod): - if self.is_delete_operator_pod: - self.log.info("Deleting pod: %s", pod.metadata.name) - self.pod_manager.delete_pod(pod) + :param labels: labels used to determine if a pod is repeated + :type labels: dict + :param try_numbers_match: do the try numbers match? Only needed for logging purposes + :type try_numbers_match: bool + :param launcher: PodLauncher + :param pod: Pod found with matching labels + """ + if try_numbers_match: + log_line = f"found a running pod with labels {labels} and the same try_number." + else: + log_line = f"found a running pod with labels {labels} but a different try_number." + + # In case of failed pods, should reattach the first time, but only once + # as the task will have already failed. + if self.reattach_on_restart and not pod.metadata.labels.get("already_checked"): + log_line += " Will attach to this pod and monitor instead of starting new one" + self.log.info(log_line) + self.pod = pod + final_state, remote_pod, result = self.monitor_launched_pod(launcher, pod) else: - self.log.info("skipping deleting pod: %s", pod.metadata.name) + log_line += f"creating pod with labels {labels} and launcher {launcher}" + self.log.info(log_line) + final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher) + return final_state, remote_pod, result - def _build_find_pod_label_selector(self, context: Optional[dict] = None) -> str: - labels = self._get_ti_pod_labels(context, include_try_number=False) - label_strings = [f'{label_id}={label}' for label_id, label in sorted(labels.items())] - return ','.join(label_strings) + f',{self.POD_CHECKED_KEY}!=True' + @staticmethod + def _get_pod_identifying_label_string(labels) -> str: + label_strings = [ + f'{label_id}={label}' for label_id, label in sorted(labels.items()) if label_id != 'try_number' + ] + return ','.join(label_strings) + ',already_checked!=True' + + @staticmethod + def _try_numbers_match(context, pod) -> bool: + return pod.metadata.labels['try_number'] == context['ti'].try_number def _set_name(self, name): if name is None: @@ -435,29 +427,11 @@ def _set_name(self, name): validate_key(name, max_length=220) return re.sub(r'[^a-z0-9.-]+', '-', name.lower()) - def patch_already_checked(self, pod: k8s.V1Pod): - """Add an "already checked" annotation to ensure we don't reattach on retries""" - pod.metadata.labels[self.POD_CHECKED_KEY] = "True" - body = PodGenerator.serialize_pod(pod) - self.client.patch_namespaced_pod(pod.metadata.name, pod.metadata.namespace, body) - - def on_kill(self) -> None: - if self.pod: - pod = self.pod - kwargs = dict( - name=pod.metadata.name, - namespace=pod.metadata.namespace, - ) - if self.termination_grace_period is not None: - kwargs.update(grace_period_seconds=self.termination_grace_period) - self.client.delete_namespaced_pod(**kwargs) - - def build_pod_request_obj(self, context=None): + def create_pod_request_obj(self) -> k8s.V1Pod: """ - Returns V1Pod object based on pod template file, full pod spec, and other operator parameters. + Creates a V1Pod based on user parameters. Note that a `pod` or `pod_template_file` + will supersede all other values. - The V1Pod attributes are derived (in order of precedence) from operator params, full pod spec, pod - template file. """ self.log.debug("Creating pod for KubernetesPodOperator task %s", self.task_id) if self.pod_template_file: @@ -476,7 +450,7 @@ def build_pod_request_obj(self, context=None): metadata=k8s.V1ObjectMeta( namespace=self.namespace, labels=self.labels, - name=self.name, + name=PodGenerator.make_unique_pod_id(self.name), annotations=self.annotations, ), spec=k8s.V1PodSpec( @@ -487,7 +461,7 @@ def build_pod_request_obj(self, context=None): containers=[ k8s.V1Container( image=self.image, - name=self.BASE_CONTAINER_NAME, + name="base", command=self.cmds, ports=self.ports, image_pull_policy=self.image_pull_policy, @@ -512,112 +486,89 @@ def build_pod_request_obj(self, context=None): pod = PodGenerator.reconcile_pods(pod_template, pod) - if self.random_name_suffix: - pod.metadata.name = PodGenerator.make_unique_pod_id(pod.metadata.name) - for secret in self.secrets: self.log.debug("Adding secret to task %s", self.task_id) pod = secret.attach_to_pod(pod) if self.do_xcom_push: self.log.debug("Adding xcom sidecar to task %s", self.task_id) pod = xcom_sidecar.add_xcom_sidecar(pod) + return pod + + def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, k8s.V1Pod, Optional[str]]: + """ + Creates a new pod and monitors for duration of task - labels = self._get_ti_pod_labels(context) - self.log.info("Creating pod %s with labels: %s", pod.metadata.name, labels) + :param labels: labels used to track pod + :param launcher: pod launcher that will manage launching and monitoring pods + :return: + """ + self.log.debug( + "Adding KubernetesPodOperator labels to pod before launch for task %s", self.task_id + ) # Merge Pod Identifying labels with labels passed to operator - pod.metadata.labels.update(labels) + self.pod.metadata.labels.update(labels) # Add Airflow Version to the label # And a label to identify that pod is launched by KubernetesPodOperator - pod.metadata.labels.update( + self.pod.metadata.labels.update( { 'airflow_version': airflow_version.replace('+', '-'), 'kubernetes_pod_operator': 'True', } ) - pod_mutation_hook(pod) - return pod - - def dry_run(self) -> None: - """ - Prints out the pod definition that would be created by this operator. - Does not include labels specific to the task instance (since there isn't - one in a dry_run) and excludes all empty elements. - """ - pod = self.build_pod_request_obj() - print(yaml.dump(_prune_dict(pod.to_dict(), mode='strict'))) - - -class _suppress(AbstractContextManager): - """ - This behaves the same as ``contextlib.suppress`` but logs the suppressed - exceptions as errors with traceback. - - The caught exception is also stored on the context manager instance under - attribute ``exception``. - """ - - def __init__(self, *exceptions): - self._exceptions = exceptions - self.exception = None - - def __enter__(self): - return self - - def __exit__(self, exctype, excinst, exctb): - caught_error = exctype is not None and issubclass(exctype, self._exceptions) - if caught_error: - self.exception = excinst - logger = logging.getLogger() - logger.error(str(excinst), exc_info=True) - return caught_error + self.log.debug("Starting pod:\n%s", yaml.safe_dump(self.pod.to_dict())) + final_state = None + try: + launcher.start_pod(self.pod, startup_timeout=self.startup_timeout_seconds) + final_state, remote_pod, result = launcher.monitor_pod(pod=self.pod, get_logs=self.get_logs) + except AirflowException: + if self.log_events_on_failure: + for event in launcher.read_pod_events(self.pod).items: + self.log.error("Pod Event: %s - %s", event.reason, event.message) + raise + finally: + if self.is_delete_operator_pod: + self.log.debug("Deleting pod for task %s", self.task_id) + launcher.delete_pod(self.pod) + elif final_state != State.SUCCESS: + self.patch_already_checked(self.pod) + return final_state, remote_pod, result -def _prune_dict(val: Any, mode='strict'): - """ - Note: this is duplicated from ``airflow.utils.helpers.prune_dict``. That one should - be the one used if possible, but this one is included to avoid having to - bump min airflow version. This function will be removed once the min airflow version - is bumped to 2.3. + def patch_already_checked(self, pod: k8s.V1Pod): + """Add an "already tried annotation to ensure we only retry once""" + pod.metadata.labels["already_checked"] = "True" + body = PodGenerator.serialize_pod(pod) + self.client.patch_namespaced_pod(pod.metadata.name, pod.metadata.namespace, body) - Given dict ``val``, returns new dict based on ``val`` with all - empty elements removed. + def monitor_launched_pod(self, launcher, pod) -> Tuple[State, Optional[str]]: + """ + Monitors a pod to completion that was created by a previous KubernetesPodOperator - What constitutes "empty" is controlled by the ``mode`` parameter. If mode is 'strict' - then only ``None`` elements will be removed. If mode is ``truthy``, then element ``x`` - will be removed if ``bool(x) is False``. - """ + :param launcher: pod launcher that will manage launching and monitoring pods + :param pod: podspec used to find pod using k8s API + :return: + """ + try: + (final_state, remote_pod, result) = launcher.monitor_pod(pod, get_logs=self.get_logs) + finally: + if self.is_delete_operator_pod: + launcher.delete_pod(pod) + if final_state != State.SUCCESS: + if self.log_events_on_failure: + for event in launcher.read_pod_events(pod).items: + self.log.error("Pod Event: %s - %s", event.reason, event.message) + if not self.is_delete_operator_pod: + self.patch_already_checked(pod) + raise AirflowException(f'Pod returned a failure: {final_state}') + return final_state, remote_pod, result - def is_empty(x): - if mode == 'strict': - return x is None - elif mode == 'truthy': - return bool(x) is False - raise ValueError("allowable values for `mode` include 'truthy' and 'strict'") - - if isinstance(val, dict): - new_dict = {} - for k, v in val.items(): - if is_empty(v): - continue - elif isinstance(v, (list, dict)): - new_val = _prune_dict(v, mode=mode) - if new_val: - new_dict[k] = new_val - else: - new_dict[k] = v - return new_dict - elif isinstance(val, list): - new_list = [] - for v in val: - if is_empty(v): - continue - elif isinstance(v, (list, dict)): - new_val = _prune_dict(v, mode=mode) - if new_val: - new_list.append(new_val) - else: - new_list.append(v) - return new_list - else: - return val + def on_kill(self) -> None: + if self.pod: + pod: k8s.V1Pod = self.pod + namespace = pod.metadata.namespace + name = pod.metadata.name + kwargs = {} + if self.termination_grace_period is not None: + kwargs = {"grace_period_seconds": self.termination_grace_period} + self.client.delete_namespaced_pod(name=name, namespace=namespace, **kwargs) diff --git a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py index 10296871efc60..9779292dffb67 100644 --- a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py +++ b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -15,14 +15,11 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from typing import TYPE_CHECKING, Optional, Sequence +from typing import Optional from airflow.models import BaseOperator from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook -if TYPE_CHECKING: - from airflow.utils.context import Context - class SparkKubernetesOperator(BaseOperator): """ @@ -34,15 +31,20 @@ class SparkKubernetesOperator(BaseOperator): :param application_file: Defines Kubernetes 'custom_resource_definition' of 'sparkApplication' as either a path to a '.json' file or a JSON string. + :type application_file: str :param namespace: kubernetes namespace to put sparkApplication + :type namespace: str :param kubernetes_conn_id: The :ref:`kubernetes connection id ` for the to Kubernetes cluster. + :type kubernetes_conn_id: str :param api_group: kubernetes api group of sparkApplication + :type api_group: str :param api_version: kubernetes api version of sparkApplication + :type api_version: str """ - template_fields: Sequence[str] = ('application_file', 'namespace') - template_ext: Sequence[str] = ('.yaml', '.yml', '.json') + template_fields = ['application_file', 'namespace'] + template_ext = ('.yaml', '.yml', '.json') ui_color = '#f4a460' def __init__( @@ -62,7 +64,7 @@ def __init__( self.api_group = api_group self.api_version = api_version - def execute(self, context: 'Context'): + def execute(self, context): self.log.info("Creating sparkApplication") hook = KubernetesHook(conn_id=self.kubernetes_conn_id) response = hook.create_custom_object( diff --git a/airflow/providers/cncf/kubernetes/provider.yaml b/airflow/providers/cncf/kubernetes/provider.yaml index b5b50542573ab..c7878ba9c416f 100644 --- a/airflow/providers/cncf/kubernetes/provider.yaml +++ b/airflow/providers/cncf/kubernetes/provider.yaml @@ -22,14 +22,6 @@ description: | `Kubernetes `__ versions: - - 3.1.2 - - 3.1.1 - - 3.1.0 - - 3.0.2 - - 3.0.1 - - 3.0.0 - - 2.2.0 - - 2.1.0 - 2.0.3 - 2.0.2 - 2.0.1 diff --git a/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py b/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py index 15ac40bcdb90a..da29e7974f298 100644 --- a/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py +++ b/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from typing import TYPE_CHECKING, Optional, Sequence +from typing import Dict, Optional from kubernetes import client @@ -23,9 +23,6 @@ from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook from airflow.sensors.base import BaseSensorOperator -if TYPE_CHECKING: - from airflow.utils.context import Context - class SparkKubernetesSensor(BaseSensorOperator): """ @@ -36,15 +33,21 @@ class SparkKubernetesSensor(BaseSensorOperator): https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta2-1.1.0-2.4.5/docs/api-docs.md#sparkapplication :param application_name: spark Application resource name + :type application_name: str :param namespace: the kubernetes namespace where the sparkApplication reside in + :type namespace: str :param kubernetes_conn_id: The :ref:`kubernetes connection` to Kubernetes cluster. + :type kubernetes_conn_id: str :param attach_log: determines whether logs for driver pod should be appended to the sensor log + :type attach_log: bool :param api_group: kubernetes api group of sparkApplication + :type api_group: str :param api_version: kubernetes api version of sparkApplication + :type api_version: str """ - template_fields: Sequence[str] = ("application_name", "namespace") + template_fields = ("application_name", "namespace") FAILURE_STATES = ("FAILED", "UNKNOWN") SUCCESS_STATES = ("COMPLETED",) @@ -94,7 +97,7 @@ def _log_driver(self, application_state: str, response: dict) -> None: e, ) - def poke(self, context: 'Context') -> bool: + def poke(self, context: Dict) -> bool: self.log.info("Poking: %s", self.application_name) response = self.hook.get_custom_object( group=self.api_group, diff --git a/setup.py b/setup.py index 0e6ae83a6557a..794c33bcd0e94 100644 --- a/setup.py +++ b/setup.py @@ -406,7 +406,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version ] kubernetes = [ 'cryptography>=2.0.0', - 'kubernetes>=21.7.0', + 'kubernetes>=3.0.0, <12.0.0', ] kylin = ['kylinpy>=2.6'] ldap = [ diff --git a/tests/kubernetes/test_client.py b/tests/kubernetes/test_client.py index ce040cf3ed8f2..9228e9b704a54 100644 --- a/tests/kubernetes/test_client.py +++ b/tests/kubernetes/test_client.py @@ -22,21 +22,25 @@ from kubernetes.client import Configuration from urllib3.connection import HTTPConnection, HTTPSConnection -from airflow.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive, get_kube_client +from airflow.kubernetes.kube_client import ( + RefreshConfiguration, + _disable_verify_ssl, + _enable_tcp_keepalive, + get_kube_client, +) class TestClient(unittest.TestCase): @mock.patch('airflow.kubernetes.kube_client.config') - def test_load_cluster_config(self, config): - get_kube_client(in_cluster=True) - assert config.load_incluster_config.called - assert config.load_kube_config.not_called + def test_load_cluster_config(self, _): + client = get_kube_client(in_cluster=True) + assert not isinstance(client.api_client.configuration, RefreshConfiguration) @mock.patch('airflow.kubernetes.kube_client.config') - def test_load_file_config(self, config): - get_kube_client(in_cluster=False) - assert config.load_incluster_config.not_called - assert config.load_kube_config.called + @mock.patch('airflow.kubernetes.refresh_config._get_kube_config_loader_for_yaml_file') + def test_load_file_config(self, _, _2): + client = get_kube_client(in_cluster=False) + assert isinstance(client.api_client.configuration, RefreshConfiguration) def test_enable_tcp_keepalive(self): socket_options = [ diff --git a/tests/kubernetes/test_refresh_config.py b/tests/kubernetes/test_refresh_config.py new file mode 100644 index 0000000000000..a0753e2e4b209 --- /dev/null +++ b/tests/kubernetes/test_refresh_config.py @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from unittest import TestCase + +import pytest +from pendulum.parsing import ParserError + +from airflow.kubernetes.refresh_config import _parse_timestamp + + +class TestRefreshKubeConfigLoader(TestCase): + def test_parse_timestamp_should_convert_z_timezone_to_unix_timestamp(self): + ts = _parse_timestamp("2020-01-13T13:42:20Z") + assert 1578922940 == ts + + def test_parse_timestamp_should_convert_regular_timezone_to_unix_timestamp(self): + ts = _parse_timestamp("2020-01-13T13:42:20+0600") + assert 1578922940 == ts + + def test_parse_timestamp_should_throw_exception(self): + with pytest.raises(ParserError): + _parse_timestamp("foobar")