Skip to content

Commit

Permalink
[DPE-2123] Ensure feature parity between kubectl and lightkube backen…
Browse files Browse the repository at this point in the history
…ds and add integration tests. (#54)

This PR 
* Attempts to make the behavior of `kubectl` and `lightkube` backends
consistent on all service account registry commands.
* Adds integration tests (parameterized by backend) to ensure the
consisitency of two backends.

Jira: [DPE-2123](https://warthogs.atlassian.net/browse/DPE-2123)
  • Loading branch information
theoctober19th authored Jan 11, 2024
1 parent 841bd81 commit e75412f
Show file tree
Hide file tree
Showing 9 changed files with 747 additions and 40 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
.make_cache
__pycache__
.coverage
.vscode
env
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ ignore_missing_imports = true

[tool.poetry]
name = "spark8t"
version = "0.0.2"
version = "0.0.3"
description = "This project allow you to create a SNAP for interacting with a K8s cluster to submit Spark jobs or run spark shells interactively"
authors = [
"Abhishek Verma <abhishek.verma@canonical.com>",
Expand Down
2 changes: 1 addition & 1 deletion spark8t/cli/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def add_logging_arguments(parser: ArgumentParser) -> ArgumentParser:
parser.add_argument(
"--log-level",
choices=["INFO", "WARN", "ERROR", "DEBUG"],
default="INFO",
default="WARN",
help="Set the log level of the logging",
)
parser.add_argument(
Expand Down
23 changes: 14 additions & 9 deletions spark8t/cli/service_account_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
spark_user_parser,
)
from spark8t.domain import PropertyFile, ServiceAccount
from spark8t.exceptions import AccountNotFound, PrimaryAccountNotFound
from spark8t.exceptions import (
AccountNotFound,
PrimaryAccountNotFound,
ResourceAlreadyExists,
)
from spark8t.services import K8sServiceAccountRegistry, parse_conf_overrides
from spark8t.utils import setup_logging

Expand Down Expand Up @@ -100,7 +104,7 @@ def main(args: Namespace, logger: Logger):
kube_interface = get_kube_interface(args)
context = args.context or kube_interface.context_name

logger.info(f"Using K8s context: {context}")
logger.debug(f"Using K8s context: {context}")

registry = K8sServiceAccountRegistry(kube_interface.with_context(context))

Expand Down Expand Up @@ -160,7 +164,7 @@ def main(args: Namespace, logger: Logger):
if maybe_service_account is None:
raise AccountNotFound(input_service_account.id)

maybe_service_account.configurations.log(logger.info)
maybe_service_account.configurations.log(print)

elif args.action == Actions.CLEAR_CONFIG:
registry.set_configurations(
Expand All @@ -173,13 +177,14 @@ def main(args: Namespace, logger: Logger):
if maybe_service_account is None:
raise PrimaryAccountNotFound()

logger.info(maybe_service_account.id)
print(maybe_service_account.id)

elif args.action == Actions.LIST:
for service_account in registry.all():
logger.info(
str.expandtabs(f"{service_account.id}\t{service_account.primary}")
)
print_line = f"{service_account.id}"
if service_account.primary:
print_line += " (Primary)"
print(print_line)


if __name__ == "__main__":
Expand All @@ -194,8 +199,8 @@ def main(args: Namespace, logger: Logger):
try:
main(args, logger)
exit(0)
except (AccountNotFound, PrimaryAccountNotFound) as e:
logger.error(str(e))
except (AccountNotFound, PrimaryAccountNotFound, ResourceAlreadyExists) as e:
print(str(e))
exit(1)
except Exception as e:
raise e
8 changes: 7 additions & 1 deletion spark8t/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,16 @@ def account(self):
return self.resource_name

def __str__(self) -> str:
return f"Account {self.account} not found"
return f"Account {self.account} could not be found."


class FormatError(SyntaxError):
"""Exception to be used when input provided by the user cannot be parsed."""

pass


class ResourceAlreadyExists(FileExistsError):
"""The resource already exists in the K8s cluster."""

pass
118 changes: 90 additions & 28 deletions spark8t/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@
PropertyFile,
ServiceAccount,
)
from spark8t.exceptions import AccountNotFound, FormatError, K8sResourceNotFound
from spark8t.exceptions import (
AccountNotFound,
FormatError,
K8sResourceNotFound,
ResourceAlreadyExists,
)
from spark8t.literals import MANAGED_BY_LABELNAME, PRIMARY_LABELNAME, SPARK8S_LABEL
from spark8t.utils import (
PercentEncodingSerializer,
Expand Down Expand Up @@ -366,20 +371,36 @@ def get_service_accounts(
k, v = PropertyFile.parse_property_line(entry)
labels_to_pass[k] = v

if not namespace:
namespace = "default"
all_namespaces = []

with io.StringIO() as buffer:
codecs.dump_all_yaml(
self.client.list(
res=LightKubeServiceAccount,
namespace=namespace,
labels=labels_to_pass,
),
buffer,
if not namespace:
# means all namespaces
iterator = self.client.list(
res=Namespace,
)
buffer.seek(0)
return list(yaml.safe_load_all(buffer))
for ns in iterator:
all_namespaces.append(ns.metadata.name)

else:
all_namespaces = [
namespace,
]

result = []
for namespace in all_namespaces:
with io.StringIO() as buffer:
codecs.dump_all_yaml(
self.client.list(
res=LightKubeServiceAccount,
namespace=namespace,
labels=labels_to_pass,
),
buffer,
)
buffer.seek(0)
result += list(yaml.safe_load_all(buffer))

return result

def get_secret(
self, secret_name: str, namespace: Optional[str] = None
Expand All @@ -390,21 +411,27 @@ def get_secret(
secret_name: name of the secret
namespace: namespace where the secret is contained
"""
try:
with io.StringIO() as buffer:
codecs.dump_all_yaml(
[
self.client.get(
res=Secret, namespace=namespace, name=secret_name
)
],
buffer,
)
buffer.seek(0)
secret = yaml.safe_load(buffer)

with io.StringIO() as buffer:
codecs.dump_all_yaml(
[self.client.get(res=Secret, namespace=namespace, name=secret_name)],
buffer,
)
buffer.seek(0)
secret = yaml.safe_load(buffer)

result = dict()
for k, v in secret["data"].items():
result[k] = base64.b64decode(v).decode("utf-8")
result = dict()
for k, v in secret["data"].items():
result[k] = base64.b64decode(v).decode("utf-8")

secret["data"] = result
return secret
secret["data"] = result
return secret
except Exception:
raise K8sResourceNotFound(secret_name, KubernetesResourceType.SECRET)

def set_label(
self,
Expand Down Expand Up @@ -755,7 +782,7 @@ def get_service_account(
f"Error retrieving account id {account_id} in namespace {namespace}"
)

self.logger.warning(service_account_raw)
self.logger.debug(service_account_raw)

return service_account_raw

Expand Down Expand Up @@ -792,7 +819,6 @@ def get_secret(
secret_name: name of the secret
namespace: namespace where the secret is contained
"""

try:
secret = self.exec(
f"get secret {secret_name} --ignore-not-found",
Expand Down Expand Up @@ -1154,6 +1180,37 @@ def create(self, service_account: ServiceAccount) -> str:
rolename = username + "-role"
rolebindingname = username + "-role-binding"

# Check if the resources to be created already exist in K8s cluster
if self.kube_interface.exists(
KubernetesResourceType.SERVICEACCOUNT,
username,
namespace=service_account.namespace,
):
raise ResourceAlreadyExists(
"Could not create the service account. "
f"A {KubernetesResourceType.SERVICEACCOUNT} with name '{username}' already exists."
)

if self.kube_interface.exists(
KubernetesResourceType.ROLE,
rolename,
namespace=service_account.namespace,
):
raise ResourceAlreadyExists(
"Could not create the service account. "
f"A {KubernetesResourceType.ROLE} with name '{rolename}' already exists."
)

if self.kube_interface.exists(
KubernetesResourceType.ROLEBINDING,
rolebindingname,
namespace=service_account.namespace,
):
raise ResourceAlreadyExists(
"Could not create the service account. "
f"A {KubernetesResourceType.ROLEBINDING} with name '{rolebindingname}' already exists."
)

self.kube_interface.create(
KubernetesResourceType.SERVICEACCOUNT,
username,
Expand Down Expand Up @@ -1273,6 +1330,11 @@ def delete(self, account_id: str) -> str:
rolename = name + "-role"
rolebindingname = name + "-role-binding"

if not self.kube_interface.exists(
KubernetesResourceType.SERVICEACCOUNT, name, namespace=namespace
):
raise AccountNotFound(name)

try:
self.kube_interface.delete(
KubernetesResourceType.SERVICEACCOUNT, name, namespace=namespace
Expand Down
Loading

0 comments on commit e75412f

Please sign in to comment.