Skip to content

Commit

Permalink
add k8s library
Browse files Browse the repository at this point in the history
  • Loading branch information
fred-labs committed Jul 26, 2024
1 parent 0ac88f6 commit 6060e0d
Show file tree
Hide file tree
Showing 14 changed files with 537 additions and 0 deletions.
1 change: 1 addition & 0 deletions libs/scenario_execution_kubernetes/MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include scenario_execution_kubernetes/lib_osc/*.osc
7 changes: 7 additions & 0 deletions libs/scenario_execution_kubernetes/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Scenario Execution Kubernetes

This packages contains custom conditions and actions to interact with [Kubernetes](https://kubernetes.io/) cluster.

It provides the following scenario execution libraries:

- `kubernetes.osc`: Kubernetes specific actions such as creating or deleting pods.
21 changes: 21 additions & 0 deletions libs/scenario_execution_kubernetes/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>scenario_execution_kubernetes</name>
<version>1.1.0</version>
<description>Package for scenario execution kubernetes library</description>
<author email="scenario-execution@intel.com">Intel Labs</author>
<maintainer email="scenario-execution@intel.com">Intel Labs</maintainer>
<license file="../../LICENSE">Apache-2.0</license>

<exec_depend>scenario_execution</exec_depend>

<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>

<export>
<build_type>ament_python</build_type>
</export>
</package>
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright (C) 2024 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions
# and limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright (C) 2024 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions
# and limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0


def get_kubernetes_library():
return 'scenario_execution_kubernetes', 'kubernetes.osc'
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright (C) 2024 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions
# and limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0

from kubernetes import client, config, utils
from enum import Enum
import py_trees
import json
from scenario_execution.actions.base_action import BaseAction


class KubernetesCreateFromYamlActionState(Enum):
IDLE = 1
CREATION_REQUESTED = 2
FAILURE = 3


class KubernetesCreateFromYaml(BaseAction):

def __init__(self, namespace: str, yaml_file: str, within_cluster: bool):
super().__init__()
self.namespace = namespace
self.yaml_file = yaml_file
self.within_cluster = within_cluster
self.client = None
self.current_state = KubernetesCreateFromYamlActionState.IDLE
self.current_request = None

def setup(self, **kwargs):
if self.within_cluster:
config.load_incluster_config()
else:
config.load_kube_config()
self.client = client.api_client.ApiClient()

def update(self) -> py_trees.common.Status: # pylint: disable=too-many-return-statements
if self.current_state == KubernetesCreateFromYamlActionState.IDLE:
self.current_request = utils.create_from_yaml(
self.client, self.yaml_file, verbose=False, namespace=self.namespace, async_req=True)
self.current_state = KubernetesCreateFromYamlActionState.CREATION_REQUESTED
self.feedback_message = f"Requested creation from yaml file '{self.yaml_file}' in namespace '{self.namespace}'" # pylint: disable= attribute-defined-outside-init
return py_trees.common.Status.RUNNING
elif self.current_state == KubernetesCreateFromYamlActionState.CREATION_REQUESTED:
success = True
for reqs in self.current_request:
for req in reqs:
if req.ready():
if not req.successful():
try:
req.get()
except client.exceptions.ApiException as e:
message = ""
body = json.loads(e.body)
if "message" in body:
message = f", message: '{body['message']}'"
self.feedback_message = f"Failure! Reason: {e.reason} {message}" # pylint: disable= attribute-defined-outside-init
success = False
else:
return py_trees.common.Status.RUNNING
if success:
return py_trees.common.Status.SUCCESS
else:
return py_trees.common.Status.FAILURE
return py_trees.common.Status.FAILURE
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Copyright (C) 2024 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions
# and limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0

from kubernetes import client, config
from enum import Enum
import py_trees
import re
from scenario_execution.actions.base_action import BaseAction


class KubernetesElementType(Enum):
POD = 1


class KubernetesDeleteActionState(Enum):
IDLE = 1
WAITING_FOR_LIST_RUNNING = 2
DELETE_REQUESTED = 3
WAIT_FOR_DELETION = 4
FAILURE = 5


class KubernetesDelete(BaseAction):

def __init__(self, target: str, namespace: str, element_type: str, grace_period: int, regex: bool, within_cluster: bool):
super().__init__()
self.target = target
self.namespace = namespace
if element_type == "pod":
self.element_type = KubernetesElementType.POD
else:
raise ValueError(f"element_type {element_type} unknown.")
self.grace_period = grace_period
self.regex = regex
self.within_cluster = within_cluster
self.client = None
self.current_state = KubernetesDeleteActionState.IDLE
self.current_request = None

def setup(self, **kwargs):
if self.within_cluster:
config.load_incluster_config()
else:
config.load_kube_config()

self.client = client.CoreV1Api()

def update(self) -> py_trees.common.Status: # pylint: disable=too-many-return-statements
if self.current_state == KubernetesDeleteActionState.IDLE:
self.current_state = KubernetesDeleteActionState.WAITING_FOR_LIST_RUNNING
self.feedback_message = f"Requesting list of running pods in namespace '{self.namespace}'" # pylint: disable= attribute-defined-outside-init
self.current_request = self.client.list_namespaced_pod(
namespace=self.namespace, async_req=True)
return py_trees.common.Status.RUNNING
elif self.current_state == KubernetesDeleteActionState.WAITING_FOR_LIST_RUNNING:
if self.current_request.ready():
running_pods = []
for i in self.current_request.get().items:
running_pods.append(i.metadata.name)

found_pod = None
if self.regex:
matched_pods = []
for pod in running_pods:
if re.search(self.target, pod):
matched_pods.append(pod)
if matched_pods:
if len(matched_pods) > 1:
self.feedback_message = f"'{self.target}' regex identified more than one pod {', '.join(matched_pods)}. Only one pod is supported!" # pylint: disable= attribute-defined-outside-init
return py_trees.common.Status.FAILURE
found_pod = matched_pods[0]
else:
if self.target in running_pods:
found_pod = self.target
if found_pod:
self.target = found_pod
self.feedback_message = f"'{self.target}' found! Requesting deletion." # pylint: disable= attribute-defined-outside-init
self.current_state = KubernetesDeleteActionState.DELETE_REQUESTED
self.current_request = self.client.delete_namespaced_pod(
found_pod, namespace=self.namespace, grace_period_seconds=int(self.grace_period), async_req=True)
return py_trees.common.Status.RUNNING
else:
self.feedback_message = f"'{self.target}' not found in list of running pods (namespace: '{self.namespace}'). Available: {', '.join(running_pods)}'" # pylint: disable= attribute-defined-outside-init
else:
return py_trees.common.Status.RUNNING
elif self.current_state == KubernetesDeleteActionState.DELETE_REQUESTED:
if self.current_request.ready():
if self.current_request.successful():
self.feedback_message = f"'{self.target}' deletion successfully requested!" # pylint: disable= attribute-defined-outside-init
self.current_request = None
self.current_state = KubernetesDeleteActionState.WAIT_FOR_DELETION
self.current_request = self.client.list_namespaced_pod(
namespace=self.namespace, async_req=True)
return py_trees.common.Status.RUNNING
else:
self.feedback_message = f"Error while deleting '{self.target}' in namespace '{self.namespace}'." # pylint: disable= attribute-defined-outside-init
else:
return py_trees.common.Status.RUNNING
elif self.current_state == KubernetesDeleteActionState.WAIT_FOR_DELETION:
if self.current_request.successful():
running_pods = []
for i in self.current_request.get().items:
running_pods.append(i.metadata.name)
if self.target not in running_pods:
return py_trees.common.Status.SUCCESS
else:
self.feedback_message = f"Waiting for deletion of '{self.target}' in namespace '{self.namespace}'." # pylint: disable= attribute-defined-outside-init
self.current_request = self.client.list_namespaced_pod(
namespace=self.namespace, async_req=True)
return py_trees.common.Status.RUNNING
else:
self.feedback_message = f"Error while requesting list of running pods (after deletion request)." # pylint: disable= attribute-defined-outside-init

return py_trees.common.Status.FAILURE
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Copyright (C) 2024 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions
# and limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0

import py_trees
from scenario_execution.actions.base_action import BaseAction
import queue
import threading
from kubernetes import client, config, watch
from kubernetes.client.rest import ApiException


class KubernetesWaitForNetworkPolicyStatus(BaseAction):

def __init__(self, target: str, status: tuple, namespace: str, within_cluster: bool):
super().__init__()
self.target = target
self.namespace = namespace
if not isinstance(status, tuple) or not isinstance(status[0], str):
raise ValueError("Status expected to be enum.")
self.expected_status = status[0]
self.within_cluster = within_cluster
self.network_client = None
self.update_queue = queue.Queue()

def setup(self, **kwargs):
if self.within_cluster:
config.load_incluster_config()
else:
config.load_kube_config()
self.k8s_client = client.api_client.ApiClient()
self.network_client = client.NetworkingV1Api(self.k8s_client)

def execute(self, target: str, status: tuple, namespace: str, within_cluster: bool):
self.monitoring_thread = threading.Thread(target=self.watch_network, daemon=True)
self.monitoring_thread.start()

def update(self) -> py_trees.common.Status:
while not self.update_queue.empty():
item = self.update_queue.get()
if len(item) != 2:
return py_trees.common.Status.FAILURE

if item[0] == self.target:
if item[1].lower() == self.expected_status:
self.feedback_message = f"Expected status '{item[1].lower()}' found." # pylint: disable= attribute-defined-outside-init
return py_trees.common.Status.SUCCESS
else:
self.feedback_message = f"Status changed to '{item[1].lower()}', expected '{self.expected_status}'." # pylint: disable= attribute-defined-outside-init
return py_trees.common.Status.RUNNING

def watch_network(self):
w = watch.Watch()
try:
for event in w.stream(self.network_client.list_namespaced_network_policy, self.namespace):
network_policy_name = event['object'].metadata.name
event_type = event['type']
self.update_queue.put((network_policy_name, event_type))
except ApiException as e:
self.logger.error(f"Error accessing kubernetes: {e}")
self.update_queue.put(())
Loading

0 comments on commit 6060e0d

Please sign in to comment.