From 574c63e8038ad588758e5835e012859308882e85 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Fri, 7 Jan 2022 13:47:06 -0800 Subject: [PATCH 1/4] Remove RefreshConfiguration workaround for K8s token refreshing A workaround was added (https://github.com/apache/airflow/pull/5731) to handle the refreshing of EKS tokens. It was necessary because of an upstream bug. It has since been fixed (https://github.com/kubernetes-client/python-base/commit/70b78cd8488068c014b6d762a0c8d358273865b4) and released in v21.7.0 (https://github.com/kubernetes-client/python/blob/master/CHANGELOG.md#v2170). --- airflow/kubernetes/kube_client.py | 47 ++------- airflow/kubernetes/refresh_config.py | 124 ------------------------ setup.py | 2 +- tests/kubernetes/test_client.py | 14 +-- tests/kubernetes/test_refresh_config.py | 106 -------------------- 5 files changed, 15 insertions(+), 278 deletions(-) delete mode 100644 airflow/kubernetes/refresh_config.py delete mode 100644 tests/kubernetes/test_refresh_config.py diff --git a/airflow/kubernetes/kube_client.py b/airflow/kubernetes/kube_client.py index 97836be9986088..7e6ba051197877 100644 --- a/airflow/kubernetes/kube_client.py +++ b/airflow/kubernetes/kube_client.py @@ -25,39 +25,10 @@ try: from kubernetes import client, config from kubernetes.client import Configuration - from kubernetes.client.api_client import ApiClient from kubernetes.client.rest import ApiException - from airflow.kubernetes.refresh_config import RefreshConfiguration, load_kube_config - has_kubernetes = True - def _get_kube_config( - in_cluster: bool, cluster_context: Optional[str], config_file: Optional[str] - ) -> Optional[Configuration]: - if in_cluster: - # load_incluster_config set default configuration with config populated by k8s - config.load_incluster_config() - return None - else: - # this block can be replaced with just config.load_kube_config once - # refresh_config module is replaced with upstream fix - cfg = RefreshConfiguration() - load_kube_config(client_configuration=cfg, config_file=config_file, context=cluster_context) - return cfg - - def _get_client_with_patched_configuration(cfg: Optional[Configuration]) -> client.CoreV1Api: - """ - This is a workaround for supporting api token refresh in k8s client. - - The function can be replace with `return client.CoreV1Api()` once the - upstream client supports token refresh. - """ - if cfg: - return client.CoreV1Api(api_client=ApiClient(configuration=cfg)) - else: - return client.CoreV1Api() - def _disable_verify_ssl() -> None: configuration = Configuration() configuration.verify_ssl = False @@ -126,17 +97,19 @@ def get_kube_client( if not has_kubernetes: raise _import_err - if not in_cluster: - if cluster_context is None: - cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None) - if config_file is None: - config_file = conf.get('kubernetes', 'config_file', fallback=None) - if conf.getboolean('kubernetes', 'enable_tcp_keepalive'): _enable_tcp_keepalive() if not conf.getboolean('kubernetes', 'verify_ssl'): _disable_verify_ssl() - client_conf = _get_kube_config(in_cluster, cluster_context, config_file) - return _get_client_with_patched_configuration(client_conf) + if in_cluster: + config.load_incluster_config() + else: + if cluster_context is None: + cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None) + if config_file is None: + config_file = conf.get('kubernetes', 'config_file', fallback=None) + config.load_kube_config(config_file=config_file, context=cluster_context) + + return client.CoreV1Api() diff --git a/airflow/kubernetes/refresh_config.py b/airflow/kubernetes/refresh_config.py deleted file mode 100644 index 25649510ce0d29..00000000000000 --- a/airflow/kubernetes/refresh_config.py +++ /dev/null @@ -1,124 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -""" -NOTE: this module can be removed once upstream client supports token refresh -see: https://github.com/kubernetes-client/python/issues/741 -""" - -import calendar -import logging -import os -import time -from typing import Optional, cast - -import pendulum -from kubernetes.client import Configuration -from kubernetes.config.exec_provider import ExecProvider -from kubernetes.config.kube_config import KUBE_CONFIG_DEFAULT_LOCATION, KubeConfigLoader - -from airflow.utils import yaml - - -def _parse_timestamp(ts_str: str) -> int: - parsed_dt = cast(pendulum.DateTime, pendulum.parse(ts_str)) - return calendar.timegm(parsed_dt.timetuple()) - - -class RefreshKubeConfigLoader(KubeConfigLoader): - """ - Patched KubeConfigLoader, this subclass takes expirationTimestamp into - account and sets api key refresh callback hook in Configuration object - """ - - def __init__(self, *args, **kwargs): - KubeConfigLoader.__init__(self, *args, **kwargs) - self.api_key_expire_ts = None - - def _load_from_exec_plugin(self): - """ - We override _load_from_exec_plugin method to also read and store - expiration timestamp for aws-iam-authenticator. It will be later - used for api token refresh. - """ - if 'exec' not in self._user: - return None - try: - status = ExecProvider(self._user['exec']).run() - if 'token' not in status: - logging.error('exec: missing token field in plugin output') - return None - self.token = f"Bearer {status['token']}" - ts_str = status.get('expirationTimestamp') - if ts_str: - self.api_key_expire_ts = _parse_timestamp(ts_str) - return True - except Exception as e: - logging.error(str(e)) - return None - - def refresh_api_key(self, client_configuration): - """Refresh API key if expired""" - if self.api_key_expire_ts and time.time() >= self.api_key_expire_ts: - self.load_and_set(client_configuration) - - def load_and_set(self, client_configuration): - KubeConfigLoader.load_and_set(self, client_configuration) - client_configuration.refresh_api_key = self.refresh_api_key - - -class RefreshConfiguration(Configuration): - """ - Patched Configuration, this subclass takes api key refresh callback hook - into account - """ - - def __init__(self, *args, **kwargs): - Configuration.__init__(self, *args, **kwargs) - self.refresh_api_key = None - - def get_api_key_with_prefix(self, identifier): - if self.refresh_api_key: - self.refresh_api_key(self) - return Configuration.get_api_key_with_prefix(self, identifier) - - -def _get_kube_config_loader_for_yaml_file(filename, **kwargs) -> Optional[RefreshKubeConfigLoader]: - """ - Adapted from the upstream _get_kube_config_loader_for_yaml_file function, changed - KubeConfigLoader to RefreshKubeConfigLoader - """ - with open(filename) as f: - return RefreshKubeConfigLoader( - config_dict=yaml.safe_load(f), - config_base_path=os.path.abspath(os.path.dirname(filename)), - **kwargs, - ) - - -def load_kube_config(client_configuration, config_file=None, context=None): - """ - Adapted from the upstream load_kube_config function, changes: - - removed persist_config argument since it's not being used - - remove `client_configuration is None` branch since we always pass - in client configuration - """ - if config_file is None: - config_file = os.path.expanduser(KUBE_CONFIG_DEFAULT_LOCATION) - - loader = _get_kube_config_loader_for_yaml_file(config_file, active_context=context, config_persister=None) - loader.load_and_set(client_configuration) diff --git a/setup.py b/setup.py index f31f2cd4a49a2f..3118636bb415ba 100644 --- a/setup.py +++ b/setup.py @@ -414,7 +414,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version ] kubernetes = [ 'cryptography>=2.0.0', - 'kubernetes>=3.0.0', + 'kubernetes>=21.7.0', ] kylin = ['kylinpy>=2.6'] ldap = [ diff --git a/tests/kubernetes/test_client.py b/tests/kubernetes/test_client.py index 9228e9b704a547..653532f1337da6 100644 --- a/tests/kubernetes/test_client.py +++ b/tests/kubernetes/test_client.py @@ -22,25 +22,19 @@ from kubernetes.client import Configuration from urllib3.connection import HTTPConnection, HTTPSConnection -from airflow.kubernetes.kube_client import ( - RefreshConfiguration, - _disable_verify_ssl, - _enable_tcp_keepalive, - get_kube_client, -) +from airflow.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive, get_kube_client class TestClient(unittest.TestCase): @mock.patch('airflow.kubernetes.kube_client.config') def test_load_cluster_config(self, _): client = get_kube_client(in_cluster=True) - assert not isinstance(client.api_client.configuration, RefreshConfiguration) + assert not isinstance(client.api_client.configuration, Configuration) @mock.patch('airflow.kubernetes.kube_client.config') - @mock.patch('airflow.kubernetes.refresh_config._get_kube_config_loader_for_yaml_file') - def test_load_file_config(self, _, _2): + def test_load_file_config(self, _): client = get_kube_client(in_cluster=False) - assert isinstance(client.api_client.configuration, RefreshConfiguration) + assert isinstance(client.api_client.configuration, Configuration) def test_enable_tcp_keepalive(self): socket_options = [ diff --git a/tests/kubernetes/test_refresh_config.py b/tests/kubernetes/test_refresh_config.py deleted file mode 100644 index 61c1b86d0161e6..00000000000000 --- a/tests/kubernetes/test_refresh_config.py +++ /dev/null @@ -1,106 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import os -from unittest import TestCase, mock - -import pytest -from kubernetes.config.kube_config import ConfigNode -from pendulum.parsing import ParserError - -from airflow.kubernetes.refresh_config import ( - RefreshConfiguration, - RefreshKubeConfigLoader, - _get_kube_config_loader_for_yaml_file, - _parse_timestamp, -) - - -class TestRefreshKubeConfigLoader(TestCase): - ROOT_PROJECT_DIR = os.path.abspath( - os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir, os.pardir) - ) - - KUBE_CONFIG_PATH = os.path.join(ROOT_PROJECT_DIR, "tests", "kubernetes", "kube_config") - - def test_parse_timestamp_should_convert_z_timezone_to_unix_timestamp(self): - ts = _parse_timestamp("2020-01-13T13:42:20Z") - assert 1578922940 == ts - - def test_parse_timestamp_should_convert_regular_timezone_to_unix_timestamp(self): - ts = _parse_timestamp("2020-01-13T13:42:20+0600") - assert 1578922940 == ts - - def test_parse_timestamp_should_throw_exception(self): - with pytest.raises(ParserError): - _parse_timestamp("foobar") - - def test_get_kube_config_loader_for_yaml_file(self): - refresh_kube_config_loader = _get_kube_config_loader_for_yaml_file(self.KUBE_CONFIG_PATH) - - assert refresh_kube_config_loader is not None - - assert refresh_kube_config_loader.current_context['name'] == 'federal-context' - - context = refresh_kube_config_loader.current_context['context'] - assert context is not None - assert context['cluster'] == 'horse-cluster' - assert context['namespace'] == 'chisel-ns' - assert context['user'] == 'green-user' - - def test_get_api_key_with_prefix(self): - - refresh_config = RefreshConfiguration() - refresh_config.api_key['key'] = '1234' - assert refresh_config is not None - - api_key = refresh_config.get_api_key_with_prefix("key") - - assert api_key == '1234' - - @mock.patch('kubernetes.config.exec_provider.ExecProvider.__init__', return_value=None) - @mock.patch('kubernetes.config.exec_provider.ExecProvider.run', return_value={'token': '1234'}) - def test_refresh_kube_config_loader(self, exec_provider_run, exec_provider_init): - current_context = _get_kube_config_loader_for_yaml_file(self.KUBE_CONFIG_PATH).current_context - - config_dict = {} - config_dict['current-context'] = 'federal-context' - config_dict['contexts'] = [] - config_dict['contexts'].append(current_context) - - config_dict['clusters'] = [] - - cluster_config = {} - cluster_config['api-version'] = 'v1' - cluster_config['server'] = 'http://cow.org:8080' - cluster_config['name'] = 'horse-cluster' - cluster_root_config = {} - cluster_root_config['cluster'] = cluster_config - cluster_root_config['name'] = 'horse-cluster' - config_dict['clusters'].append(cluster_root_config) - - refresh_kube_config_loader = RefreshKubeConfigLoader(config_dict=config_dict) - refresh_kube_config_loader._user = {} - - config_node = ConfigNode('command', 'test') - config_node.__dict__['apiVersion'] = '2.0' - config_node.__dict__['command'] = 'test' - - refresh_kube_config_loader._user['exec'] = config_node - - result = refresh_kube_config_loader._load_from_exec_plugin() - assert result is not None From db9d635d903681381cac73308dce8edb0323a4dc Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Fri, 7 Jan 2022 13:57:54 -0800 Subject: [PATCH 2/4] fixup! Remove RefreshConfiguration workaround for K8s token refreshing --- UPDATING.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/UPDATING.md b/UPDATING.md index cda775f624b808..c929eced3de711 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -81,6 +81,10 @@ https://developers.google.com/style/inclusive-documentation --> +### Minimum kubernetes version bumped from 3.0.0 to 21.7.0 + +No change in behavior is expected. This was necessary in order to take advantage of a [bugfix](https://github.com/kubernetes-client/python-base/commit/70b78cd8488068c014b6d762a0c8d358273865b4) concerning refreshing of Kubernetes API tokens with EKS, which enabled the removal of some [workaround code](https://github.com/apache/airflow/pull/20759). + ### Deprecation: `Connection.extra` must be JSON-encoded dict #### TLDR From 9892028113ba80020501631d0573b55de289f658 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Fri, 7 Jan 2022 21:55:02 -0800 Subject: [PATCH 3/4] fixup! Remove RefreshConfiguration workaround for K8s token refreshing --- tests/kubernetes/test_client.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/kubernetes/test_client.py b/tests/kubernetes/test_client.py index 653532f1337da6..ce040cf3ed8f21 100644 --- a/tests/kubernetes/test_client.py +++ b/tests/kubernetes/test_client.py @@ -27,14 +27,16 @@ class TestClient(unittest.TestCase): @mock.patch('airflow.kubernetes.kube_client.config') - def test_load_cluster_config(self, _): - client = get_kube_client(in_cluster=True) - assert not isinstance(client.api_client.configuration, Configuration) + def test_load_cluster_config(self, config): + get_kube_client(in_cluster=True) + assert config.load_incluster_config.called + assert config.load_kube_config.not_called @mock.patch('airflow.kubernetes.kube_client.config') - def test_load_file_config(self, _): - client = get_kube_client(in_cluster=False) - assert isinstance(client.api_client.configuration, Configuration) + def test_load_file_config(self, config): + get_kube_client(in_cluster=False) + assert config.load_incluster_config.not_called + assert config.load_kube_config.called def test_enable_tcp_keepalive(self): socket_options = [ From 1aca5915c8abbff04323665014ccbe2801ae8599 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 16 Mar 2022 09:58:32 -0700 Subject: [PATCH 4/4] remove old k8s classes --- airflow/providers/cncf/kubernetes/utils/pod_manager.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 17c2225631a48f..2323ae154870bd 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -38,11 +38,7 @@ from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: - try: - # Kube >= 19 - from kubernetes.client.models.core_v1_event_list import CoreV1EventList as V1EventList - except ImportError: - from kubernetes.client.models.v1_event_list import V1EventList + from kubernetes.client.models.core_v1_event_list import CoreV1EventList class PodLaunchFailedException(AirflowException): @@ -298,7 +294,7 @@ def read_pod_logs( raise @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) - def read_pod_events(self, pod: V1Pod) -> "V1EventList": + def read_pod_events(self, pod: V1Pod) -> "CoreV1EventList": """Reads events from the POD""" try: return self._client.list_namespaced_event(