Skip to content

Commit

Permalink
Update Kubernetes library version
Browse files Browse the repository at this point in the history
Previously we pinned this version as v12 as a change to Kube library
internals meant v1.Pod objects now have a logger object inside them, and
couldn't be pickled on Python 3.6.

To fix that we have "backported" the change in Python 3.7 to make Logger
objects be pickled "by name". (In Python 3.7 the change adds
`__reduce__` methods on to the Logger and RootLogger objects, but here
we achieve it `copyreg` stdlib module so we don't monkeypatch
anything.)

This fix is also applied in to airflow core in a separate commit, but we
also apply it here in the provider so that cncf.kubernetes client
library can be updated but still used with older versions of Airflow
that don't have this fix in.
  • Loading branch information
ashb committed Jan 4, 2022
1 parent 9f135bc commit 0d1dc45
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 5 deletions.
27 changes: 27 additions & 0 deletions airflow/providers/cncf/kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,30 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import sys

if sys.version_info < (3, 7):
# This is needed because the Python Kubernetes client >= 12.0 contains a logging object, meaning that
# v1.Pod et al. are not pickleable on Python 3.6.

# Python 3.7 added this via https://bugs.python.org/issue30520 in 2017 -- but Python 3.6 doesn't have this
# method.

# This is duplicated/backported from airflow.logging_config in 2.2, but by having it here as well it means
# that we can update the version used in this provider and have it work for older versions
import copyreg
import logging

def _reduce_Logger(logger):
if logging.getLogger(logger.name) is not logger:
import pickle

raise pickle.PicklingError('logger cannot be pickled')
return logging.getLogger, (logger.name,)

def _reduce_RootLogger(logger):
return logging.getLogger, ()

if logging.Logger not in copyreg.dispatch_table:
copyreg.pickle(logging.Logger, _reduce_Logger)
copyreg.pickle(logging.RootLogger, _reduce_RootLogger)
12 changes: 9 additions & 3 deletions airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@
import time
from contextlib import closing
from datetime import datetime
from typing import Iterable, Optional, Tuple, Union
from typing import TYPE_CHECKING, Iterable, Optional, Tuple, Union

import pendulum
import tenacity
from kubernetes import client, watch
from kubernetes.client.models.v1_event_list import V1EventList
from kubernetes.client.models.v1_pod import V1Pod
from kubernetes.client.rest import ApiException
from kubernetes.stream import stream as kubernetes_stream
Expand All @@ -38,6 +37,13 @@
from airflow.kubernetes.pod_generator import PodDefaults
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
try:
# Kube >= 19
from kubernetes.client.models.core_v1_event_list import CoreV1EventList as V1EventList
except ImportError:
from kubernetes.client.models.v1_event_list import V1EventList


class PodLaunchFailedException(AirflowException):
"""When pod launching fails in KubernetesPodOperator."""
Expand Down Expand Up @@ -293,7 +299,7 @@ def read_pod_logs(
raise

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def read_pod_events(self, pod: V1Pod) -> V1EventList:
def read_pod_events(self, pod: V1Pod) -> "V1EventList":
"""Reads events from the POD"""
try:
return self._client.list_namespaced_event(
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version
]
kubernetes = [
'cryptography>=2.0.0',
'kubernetes>=3.0.0, <12.0.0',
'kubernetes>=3.0.0',
]
kylin = ['kylinpy>=2.6']
ldap = [
Expand Down
6 changes: 5 additions & 1 deletion tests/kubernetes/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,9 @@ def test_disable_verify_ssl(self):

_disable_verify_ssl()

configuration = Configuration()
# Support wide range of kube client libraries
if hasattr(Configuration, 'get_default_copy'):
configuration = Configuration.get_default_copy()
else:
configuration = Configuration()
self.assertFalse(configuration.verify_ssl)

0 comments on commit 0d1dc45

Please sign in to comment.