Skip to content

Commit

Permalink
Check if embedded cert is base64 encoded before decoding (#485)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobtomlinson authored Sep 9, 2024
1 parent 6937a15 commit 75d5ace
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 15 deletions.
26 changes: 11 additions & 15 deletions kr8s/_auth.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# SPDX-FileCopyrightText: Copyright (c) 2023-2024, Kr8s Developers (See LICENSE for list)
# SPDX-License-Identifier: BSD 3-Clause License
import base64
import binascii
import json
import os
import pathlib
Expand Down Expand Up @@ -193,12 +192,10 @@ async def _load_kubeconfig(self) -> None:
)
if "client-key-data" in self._user:
async with NamedTemporaryFile(delete=False) as key_file:
try:
key_data = base64.b64decode(
self._user["client-key-data"], validate=True
)
except binascii.Error:
if "-----" in self._user["client-key-data"]:
key_data = self._user["client-key-data"].encode()
else:
key_data = base64.b64decode(self._user["client-key-data"])
await key_file.write_bytes(key_data)
self.client_key_file = str(key_file)
if "client-certificate" in self._user:
Expand All @@ -212,12 +209,10 @@ async def _load_kubeconfig(self) -> None:
)
if "client-certificate-data" in self._user:
async with NamedTemporaryFile(delete=False) as cert_file:
try:
cert_data = base64.b64decode(
self._user["client-certificate-data"], validate=True
)
except binascii.Error:
if "-----" in self._user["client-certificate-data"]:
cert_data = self._user["client-certificate-data"].encode()
else:
cert_data = base64.b64decode(self._user["client-certificate-data"])
await cert_file.write_bytes(cert_data)
self.client_cert_file = str(cert_file)
if "certificate-authority" in self._cluster:
Expand All @@ -231,12 +226,13 @@ async def _load_kubeconfig(self) -> None:
)
if "certificate-authority-data" in self._cluster:
async with NamedTemporaryFile(delete=False) as ca_file:
try:
if "-----" in self._cluster["certificate-authority-data"]:
ca_data = self._cluster["certificate-authority-data"].encode()
else:
ca_data = base64.b64decode(
self._cluster["certificate-authority-data"], validate=True
self._cluster["certificate-authority-data"]
)
except binascii.Error:
ca_data = self._cluster["certificate-authority-data"].encode()

await ca_file.write_bytes(ca_data)
self.server_ca_file = str(ca_file)
if "token" in self._user:
Expand Down
59 changes: 59 additions & 0 deletions kr8s/tests/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,55 @@ async def kubeconfig_with_token(k8s_cluster, k8s_token):
yield f.name


@pytest.fixture
async def kubeconfig_with_decoded_certs(k8s_cluster):
kubeconfig = yaml.safe_load(k8s_cluster.kubeconfig_path.read_text())
kubeconfig["clusters"][0]["cluster"]["certificate-authority-data"] = (
base64.b64decode(
kubeconfig["clusters"][0]["cluster"]["certificate-authority-data"]
)
).decode()
kubeconfig["users"][0]["user"]["client-certificate-data"] = (
base64.b64decode(kubeconfig["users"][0]["user"]["client-certificate-data"])
).decode()
kubeconfig["users"][0]["user"]["client-key-data"] = (
base64.b64decode(kubeconfig["users"][0]["user"]["client-key-data"])
).decode()
with tempfile.NamedTemporaryFile() as f:
f.write(yaml.safe_dump(kubeconfig).encode())
f.flush()
yield f.name


@pytest.fixture
async def kubeconfig_with_line_breaks_in_certs(k8s_cluster):
def insert_every(instring: str, substring: str, interval: int) -> str:
"""Insert a substring every interval characters in instring.
Example:
>>> insert_every("abcdefghi", ".", 3)
"abc.def.ghi"
"""
return substring.join(
instring[i : i + interval] for i in range(0, len(instring), interval)
)

kubeconfig = yaml.safe_load(k8s_cluster.kubeconfig_path.read_text())
kubeconfig["clusters"][0]["cluster"]["certificate-authority-data"] = insert_every(
kubeconfig["clusters"][0]["cluster"]["certificate-authority-data"], "\n", 64
)
kubeconfig["users"][0]["user"]["client-certificate-data"] = insert_every(
kubeconfig["users"][0]["user"]["client-certificate-data"], "\n", 64
)
kubeconfig["users"][0]["user"]["client-key-data"] = insert_every(
kubeconfig["users"][0]["user"]["client-key-data"], "\n", 64
)
with tempfile.NamedTemporaryFile() as f:
f.write(yaml.safe_dump(kubeconfig).encode())
f.flush()
yield f.name


@pytest.fixture
async def kubeconfig_with_second_context(k8s_cluster):
# Open kubeconfig and extract the certificates
Expand Down Expand Up @@ -258,3 +307,13 @@ async def test_certs_on_disk(kubeconfig_with_certs_on_disk, absolute):
with kubeconfig_with_certs_on_disk(absolute=absolute) as kubeconfig:
api = await kr8s.asyncio.api(kubeconfig=kubeconfig)
assert await api.get("pods", namespace=kr8s.ALL)


async def test_certs_not_encoded(kubeconfig_with_decoded_certs):
api = await kr8s.asyncio.api(kubeconfig=kubeconfig_with_decoded_certs)
assert await api.get("pods", namespace=kr8s.ALL)


async def test_certs_with_encoded_line_breaks(kubeconfig_with_line_breaks_in_certs):
api = await kr8s.asyncio.api(kubeconfig=kubeconfig_with_line_breaks_in_certs)
assert await api.get("pods", namespace=kr8s.ALL)

0 comments on commit 75d5ace

Please sign in to comment.