Skip to content

Commit

Permalink
Revert "Remove RefreshConfiguration workaround for K8s token refreshi…
Browse files Browse the repository at this point in the history
…ng (#20759)"

This reverts commit d39197f.
  • Loading branch information
potiuk authored and ephraimbuddy committed Mar 28, 2022
1 parent e0754a4 commit 60a2b90
Show file tree
Hide file tree
Showing 19 changed files with 537 additions and 848 deletions.
6 changes: 2 additions & 4 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@ https://developers.google.com/style/inclusive-documentation

## Airflow 2.2.5

### Minimum kubernetes version bumped from 3.0.0 to 21.7.0

No change in behavior is expected. This was necessary in order to take advantage of a [bugfix](https://github.com/kubernetes-client/python-base/commit/70b78cd8488068c014b6d762a0c8d358273865b4) concerning refreshing of Kubernetes API tokens with EKS, which enabled the removal of some [workaround code](https://github.com/apache/airflow/pull/20759).
No breaking changes.

## Airflow 2.2.4

Expand Down Expand Up @@ -1381,7 +1379,7 @@ delete this option.
#### `airflow.models.dagbag.DagBag`
Passing `store_serialized_dags` argument to DagBag.__init__ and accessing `DagBag.store_serialized_dags` property
Passing `store_serialized_dags` argument to `DagBag.__init__` and accessing `DagBag.store_serialized_dags` property
are deprecated and will be removed in future versions.
Expand Down
47 changes: 37 additions & 10 deletions airflow/kubernetes/kube_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,39 @@
try:
from kubernetes import client, config
from kubernetes.client import Configuration
from kubernetes.client.api_client import ApiClient
from kubernetes.client.rest import ApiException

from airflow.kubernetes.refresh_config import RefreshConfiguration, load_kube_config

has_kubernetes = True

def _get_kube_config(
in_cluster: bool, cluster_context: Optional[str], config_file: Optional[str]
) -> Optional[Configuration]:
if in_cluster:
# load_incluster_config set default configuration with config populated by k8s
config.load_incluster_config()
return None
else:
# this block can be replaced with just config.load_kube_config once
# refresh_config module is replaced with upstream fix
cfg = RefreshConfiguration()
load_kube_config(client_configuration=cfg, config_file=config_file, context=cluster_context)
return cfg

def _get_client_with_patched_configuration(cfg: Optional[Configuration]) -> client.CoreV1Api:
"""
This is a workaround for supporting api token refresh in k8s client.
The function can be replace with `return client.CoreV1Api()` once the
upstream client supports token refresh.
"""
if cfg:
return client.CoreV1Api(api_client=ApiClient(configuration=cfg))
else:
return client.CoreV1Api()

def _disable_verify_ssl() -> None:
configuration = Configuration()
configuration.verify_ssl = False
Expand Down Expand Up @@ -101,19 +130,17 @@ def get_kube_client(
if not has_kubernetes:
raise _import_err

if not in_cluster:
if cluster_context is None:
cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None)
if config_file is None:
config_file = conf.get('kubernetes', 'config_file', fallback=None)

if conf.getboolean('kubernetes', 'enable_tcp_keepalive'):
_enable_tcp_keepalive()

if not conf.getboolean('kubernetes', 'verify_ssl'):
_disable_verify_ssl()

if in_cluster:
config.load_incluster_config()
else:
if cluster_context is None:
cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None)
if config_file is None:
config_file = conf.get('kubernetes', 'config_file', fallback=None)
config.load_kube_config(config_file=config_file, context=cluster_context)

return client.CoreV1Api()
client_conf = _get_kube_config(in_cluster, cluster_context, config_file)
return _get_client_with_patched_configuration(client_conf)
124 changes: 124 additions & 0 deletions airflow/kubernetes/refresh_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
NOTE: this module can be removed once upstream client supports token refresh
see: https://github.com/kubernetes-client/python/issues/741
"""

import calendar
import logging
import os
import time
from typing import Optional, cast

import pendulum
from kubernetes.client import Configuration
from kubernetes.config.exec_provider import ExecProvider
from kubernetes.config.kube_config import KUBE_CONFIG_DEFAULT_LOCATION, KubeConfigLoader

from airflow.utils import yaml


def _parse_timestamp(ts_str: str) -> int:
parsed_dt = cast(pendulum.DateTime, pendulum.parse(ts_str))
return calendar.timegm(parsed_dt.timetuple())


class RefreshKubeConfigLoader(KubeConfigLoader):
"""
Patched KubeConfigLoader, this subclass takes expirationTimestamp into
account and sets api key refresh callback hook in Configuration object
"""

def __init__(self, *args, **kwargs):
KubeConfigLoader.__init__(self, *args, **kwargs)
self.api_key_expire_ts = None

def _load_from_exec_plugin(self):
"""
We override _load_from_exec_plugin method to also read and store
expiration timestamp for aws-iam-authenticator. It will be later
used for api token refresh.
"""
if 'exec' not in self._user:
return None
try:
status = ExecProvider(self._user['exec']).run()
if 'token' not in status:
logging.error('exec: missing token field in plugin output')
return None
self.token = f"Bearer {status['token']}"
ts_str = status.get('expirationTimestamp')
if ts_str:
self.api_key_expire_ts = _parse_timestamp(ts_str)
return True
except Exception as e:
logging.error(str(e))
return None

def refresh_api_key(self, client_configuration):
"""Refresh API key if expired"""
if self.api_key_expire_ts and time.time() >= self.api_key_expire_ts:
self.load_and_set(client_configuration)

def load_and_set(self, client_configuration):
KubeConfigLoader.load_and_set(self, client_configuration)
client_configuration.refresh_api_key = self.refresh_api_key


class RefreshConfiguration(Configuration):
"""
Patched Configuration, this subclass takes api key refresh callback hook
into account
"""

def __init__(self, *args, **kwargs):
Configuration.__init__(self, *args, **kwargs)
self.refresh_api_key = None

def get_api_key_with_prefix(self, identifier):
if self.refresh_api_key:
self.refresh_api_key(self)
return Configuration.get_api_key_with_prefix(self, identifier)


def _get_kube_config_loader_for_yaml_file(filename, **kwargs) -> Optional[RefreshKubeConfigLoader]:
"""
Adapted from the upstream _get_kube_config_loader_for_yaml_file function, changed
KubeConfigLoader to RefreshKubeConfigLoader
"""
with open(filename) as f:
return RefreshKubeConfigLoader(
config_dict=yaml.safe_load(f),
config_base_path=os.path.abspath(os.path.dirname(filename)),
**kwargs,
)


def load_kube_config(client_configuration, config_file=None, context=None):
"""
Adapted from the upstream load_kube_config function, changes:
- removed persist_config argument since it's not being used
- remove `client_configuration is None` branch since we always pass
in client configuration
"""
if config_file is None:
config_file = os.path.expanduser(KUBE_CONFIG_DEFAULT_LOCATION)

loader = _get_kube_config_loader_for_yaml_file(config_file, active_context=context, config_persister=None)
loader.load_and_set(client_configuration)
Loading

0 comments on commit 60a2b90

Please sign in to comment.