Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use KubernetesHook to create api client in KubernetesPodOperator #20578

Merged
merged 23 commits into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions airflow/providers/cncf/kubernetes/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@
Changelog
---------

main
....

Features
~~~~~~~~

KubernetesPodOperator now uses KubernetesHook
`````````````````````````````````````````````

Previously, KubernetesPodOperator relied on core Airflow configuration (namely setting for kubernetes executor) for certain settings used in client generation. Now KubernetesPodOperator uses KubernetesHook, and the consideration of core k8s settings is officially deprecated.

If you are using the Airflow configuration settings (e.g. as opposed to operator params) to configure the kubernetes client, then prior to the next major release you will need to add an Airflow connection and set your KPO tasks to use that connection.

4.0.2
.....

Expand Down
119 changes: 112 additions & 7 deletions airflow/providers/cncf/kubernetes/hooks/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
# under the License.
import sys
import tempfile
from typing import Any, Dict, Generator, Optional, Tuple, Union
import warnings
from typing import Any, Dict, Generator, List, Optional, Tuple, Union

from kubernetes.config import ConfigException

from airflow.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive

if sys.version_info >= (3, 8):
from functools import cached_property
else:
Expand Down Expand Up @@ -63,6 +66,14 @@ class KubernetesHook(BaseHook):

:param conn_id: The :ref:`kubernetes connection <howto/connection:kubernetes>`
to Kubernetes cluster.
:param client_configuration: Optional dictionary of client configuration params.
Passed on to kubernetes client.
:param cluster_context: Optionally specify a context to use (e.g. if you have multiple
in your kubeconfig.
:param config_file: Path to kubeconfig file.
:param in_cluster: Set to ``True`` if running from within a kubernetes cluster.
:param disable_verify_ssl: Set to ``True`` if SSL verification should be disabled.
:param disable_tcp_keepalive: Set to ``True`` if you want to disable keepalive logic.
"""

conn_name_attr = 'kubernetes_conn_id'
Expand Down Expand Up @@ -91,6 +102,8 @@ def get_connection_form_widgets() -> Dict[str, Any]:
"extra__kubernetes__cluster_context": StringField(
lazy_gettext('Cluster context'), widget=BS3TextFieldWidget()
),
"extra__kubernetes__disable_verify_ssl": BooleanField(lazy_gettext('Disable SSL')),
"extra__kubernetes__disable_tcp_keepalive": BooleanField(lazy_gettext('Disable TCP keepalive')),
}

@staticmethod
Expand All @@ -108,37 +121,77 @@ def __init__(
cluster_context: Optional[str] = None,
config_file: Optional[str] = None,
in_cluster: Optional[bool] = None,
disable_verify_ssl: Optional[bool] = None,
disable_tcp_keepalive: Optional[bool] = None,
) -> None:
super().__init__()
self.conn_id = conn_id
self.client_configuration = client_configuration
self.cluster_context = cluster_context
self.config_file = config_file
self.in_cluster = in_cluster
self.disable_verify_ssl = disable_verify_ssl
self.disable_tcp_keepalive = disable_tcp_keepalive

# these params used for transition in KPO to K8s hook
# for a deprecation period we will continue to consider k8s settings from airflow.cfg
self._deprecated_core_disable_tcp_keepalive: Optional[bool] = None
self._deprecated_core_disable_verify_ssl: Optional[bool] = None
self._deprecated_core_in_cluster: Optional[bool] = None
self._deprecated_core_cluster_context: Optional[str] = None
self._deprecated_core_config_file: Optional[str] = None

@staticmethod
def _coalesce_param(*params):
for param in params:
if param is not None:
return param

def get_conn(self) -> Any:
"""Returns kubernetes api session for use with requests"""
@cached_property
def conn_extras(self):
if self.conn_id:
connection = self.get_connection(self.conn_id)
extras = connection.extra_dejson
else:
extras = {}
return extras

def _get_field(self, field_name):
if field_name.startswith('extra_'):
raise ValueError(
f"Got prefixed name {field_name}; please remove the 'extra__kubernetes__' prefix "
f"when using this method."
)
if field_name in self.conn_extras:
return self.conn_extras[field_name] or None
prefixed_name = f"extra__kubernetes__{field_name}"
return self.conn_extras.get(prefixed_name) or None

@staticmethod
def _deprecation_warning_core_param(deprecation_warnings):
settings_list_str = ''.join([f"\n\t{k}={v!r}" for k, v in deprecation_warnings])
warnings.warn(
f"\nApplying core Airflow settings from section [kubernetes] with the following keys:"
f"{settings_list_str}\n"
"In a future release, KubernetesPodOperator will no longer consider core\n"
"Airflow settings; define an Airflow connection instead.",
DeprecationWarning,
)

def get_conn(self) -> Any:
"""Returns kubernetes api session for use with requests"""

in_cluster = self._coalesce_param(
self.in_cluster, extras.get("extra__kubernetes__in_cluster") or None
self.in_cluster, self.conn_extras.get("extra__kubernetes__in_cluster") or None
)
cluster_context = self._coalesce_param(
self.cluster_context, extras.get("extra__kubernetes__cluster_context") or None
self.cluster_context, self.conn_extras.get("extra__kubernetes__cluster_context") or None
)
kubeconfig_path = self._coalesce_param(
self.config_file, extras.get("extra__kubernetes__kube_config_path") or None
self.config_file, self.conn_extras.get("extra__kubernetes__kube_config_path") or None
)
kubeconfig = extras.get("extra__kubernetes__kube_config") or None

kubeconfig = self.conn_extras.get("extra__kubernetes__kube_config") or None
num_selected_configuration = len([o for o in [in_cluster, kubeconfig, kubeconfig_path] if o])

if num_selected_configuration > 1:
Expand All @@ -147,6 +200,43 @@ def get_conn(self) -> Any:
"kube_config, in_cluster are mutually exclusive. "
"You can only use one option at a time."
)

disable_verify_ssl = self._coalesce_param(
self.disable_verify_ssl, _get_bool(self._get_field("disable_verify_ssl"))
)
disable_tcp_keepalive = self._coalesce_param(
self.disable_tcp_keepalive, _get_bool(self._get_field("disable_tcp_keepalive"))
)

# BEGIN apply settings from core kubernetes configuration
# this section should be removed in next major release
deprecation_warnings: List[Tuple[str, Any]] = []
if disable_verify_ssl is None and self._deprecated_core_disable_verify_ssl is True:
deprecation_warnings.append(('verify_ssl', False))
disable_verify_ssl = self._deprecated_core_disable_verify_ssl
# by default, hook will try in_cluster first. so we only need to
# apply core airflow config and alert when False and in_cluster not otherwise set.
if in_cluster is None and self._deprecated_core_in_cluster is False:
deprecation_warnings.append(('in_cluster', self._deprecated_core_in_cluster))
in_cluster = self._deprecated_core_in_cluster
if not cluster_context and self._deprecated_core_cluster_context:
deprecation_warnings.append(('cluster_context', self._deprecated_core_cluster_context))
cluster_context = self._deprecated_core_cluster_context
if not kubeconfig_path and self._deprecated_core_config_file:
deprecation_warnings.append(('config_file', self._deprecated_core_config_file))
kubeconfig_path = self._deprecated_core_config_file
if disable_tcp_keepalive is None and self._deprecated_core_disable_tcp_keepalive is True:
deprecation_warnings.append(('enable_tcp_keepalive', False))
disable_tcp_keepalive = True
if deprecation_warnings:
self._deprecation_warning_core_param(deprecation_warnings)
# END apply settings from core kubernetes configuration

if disable_verify_ssl is True:
_disable_verify_ssl()
if disable_tcp_keepalive is not True:
_enable_tcp_keepalive()

if in_cluster:
self.log.debug("loading kube_config from: in_cluster configuration")
config.load_incluster_config()
Expand Down Expand Up @@ -316,3 +406,18 @@ def get_pod_logs(
_preload_content=False,
namespace=namespace if namespace else self.get_namespace(),
)


def _get_bool(val) -> Optional[bool]:
"""
Converts val to bool if can be done with certainty.
If we cannot infer intention we return None.
"""
if isinstance(val, bool):
return val
elif isinstance(val, str):
if val.strip().lower() == 'true':
return True
elif val.strip().lower() == 'false':
return False
return None
61 changes: 48 additions & 13 deletions airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@

from kubernetes.client import CoreV1Api, models as k8s

from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.kubernetes import kube_client, pod_generator
from airflow.kubernetes import pod_generator
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.kubernetes.secret import Secret
from airflow.models import BaseOperator
Expand Down Expand Up @@ -142,6 +143,7 @@ class KubernetesPodOperator(BaseOperator):
:param priority_class_name: priority class name for the launched Pod
:param termination_grace_period: Termination grace period if task killed in UI,
defaults to kubernetes default
:param: kubernetes_conn_id: To retrieve credentials for your k8s cluster from an Airflow connection
"""

BASE_CONTAINER_NAME = 'base'
Expand Down Expand Up @@ -209,7 +211,6 @@ def __init__(
if kwargs.get('xcom_push') is not None:
raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
super().__init__(resources=None, **kwargs)

self.kubernetes_conn_id = kubernetes_conn_id
self.do_xcom_push = do_xcom_push
self.image = image
Expand Down Expand Up @@ -324,19 +325,20 @@ def _get_ti_pod_labels(context: Optional[dict] = None, include_try_number: bool
def pod_manager(self) -> PodManager:
return PodManager(kube_client=self.client)

@cached_property
def client(self) -> CoreV1Api:
if self.kubernetes_conn_id:
hook = KubernetesHook(conn_id=self.kubernetes_conn_id)
return hook.core_v1_client

kwargs: Dict[str, Any] = dict(
cluster_context=self.cluster_context,
def get_hook(self):
hook = KubernetesHook(
conn_id=self.kubernetes_conn_id,
in_cluster=self.in_cluster,
config_file=self.config_file,
cluster_context=self.cluster_context,
)
if self.in_cluster is not None:
kwargs.update(in_cluster=self.in_cluster)
return kube_client.get_kube_client(**kwargs)
self._patch_deprecated_k8s_settings(hook)
return hook

@cached_property
def client(self) -> CoreV1Api:
hook = self.get_hook()
return hook.core_v1_client

def find_pod(self, namespace, context, *, exclude_checked=True) -> Optional[k8s.V1Pod]:
"""Returns an already-running pod for this task instance if one exists."""
Expand Down Expand Up @@ -573,6 +575,39 @@ def dry_run(self) -> None:
pod = self.build_pod_request_obj()
print(yaml.dump(prune_dict(pod.to_dict(), mode='strict')))

def _patch_deprecated_k8s_settings(self, hook: KubernetesHook):
"""
Here we read config from core Airflow config [kubernetes] section.
In a future release we will stop looking at this section and require users
to use Airflow connections to configure KPO.

When we find values there that we need to apply on the hook, we patch special
hook attributes here.
"""

# default for enable_tcp_keepalive is True; patch if False
if conf.getboolean('kubernetes', 'enable_tcp_keepalive') is False:
hook._deprecated_core_disable_tcp_keepalive = True

# default verify_ssl is True; patch if False.
if conf.getboolean('kubernetes', 'verify_ssl') is False:
hook._deprecated_core_disable_verify_ssl = True

# default for in_cluster is True; patch if False and no KPO param.
conf_in_cluster = conf.getboolean('kubernetes', 'in_cluster')
if self.in_cluster is None and conf_in_cluster is False:
hook._deprecated_core_in_cluster = conf_in_cluster

# there's no default for cluster context; if we get something (and no KPO param) patch it.
conf_cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None)
if not self.cluster_context and conf_cluster_context:
hook._deprecated_core_cluster_context = conf_cluster_context

# there's no default for config_file; if we get something (and no KPO param) patch it.
conf_config_file = conf.get('kubernetes', 'config_file', fallback=None)
if not self.config_file and conf_config_file:
hook._deprecated_core_config_file = conf_config_file


class _suppress(AbstractContextManager):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,17 @@ Kube config (JSON format)
Namespace
Default Kubernetes namespace for the connection.

When specifying the connection in environment variable you should specify
it using URI syntax.
Cluster context
When using a kube config, can specify which context to use.

Note that all components of the URI should be URL-encoded.
Disable verify SSL
Can optionally disable SSL certificate verification. By default SSL is verified.

For example:
Disable TCP keepalive
TCP keepalive is a feature (enabled by default) that tries to keep long-running connections
alive. Set this parameter to True to disable this feature.

Example storing connection in env var using URI format:

.. code-block:: bash

Expand Down
Loading