Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Secret Masking to Rule Enfocement API View Output #6170

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ Added
Contributed by @cognifloyd
* Build of ST2 EL9 packages #6153
Contributed by @amanda11
* Add Secret Masking to RuleEnforcementApiView Controller. #6170
Contributed by @philipphomberger

3.8.1 - December 13, 2023
-------------------------
Expand Down
56 changes: 49 additions & 7 deletions st2api/st2api/controllers/v1/rule_enforcement_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from st2api.controllers.resource import ResourceController


__all__ = ["RuleEnforcementViewController"]


Expand Down Expand Up @@ -67,10 +68,34 @@ def get_all(
raw_filters=raw_filters,
requester_user=requester_user,
)

rule_enforcement_apis.json = self._append_view_properties(
rule_enforcement_apis.json
)

rule_enforcement_apis = eval(str(rule_enforcement_apis.json))
i = 0
for rule_enforcement_api in rule_enforcement_apis:
secret_parameter = []
if "parameters" in str(rule_enforcement_apis[i]):
for parameter in rule_enforcement_api["execution"]["action"][
"parameters"
]:
if "secret" in str(
rule_enforcement_api["execution"]["action"]["parameters"][
parameter
]
):
if rule_enforcement_api["execution"]["action"]["parameters"][
parameter
]["secret"]:
secret_parameter.append(parameter)
for secret in rule_enforcement_api["execution"]["parameters"]:
if secret in str(secret_parameter):
rule_enforcement_api["execution"]["parameters"][
secret
] = "*******"
rule_enforcement_apis[i] = rule_enforcement_api
i = i + 1
return rule_enforcement_apis

def get_one(self, id, requester_user):
Expand All @@ -84,6 +109,26 @@ def get_one(self, id, requester_user):
rule_enforcement_api = self._append_view_properties(
[rule_enforcement_api.__json__()]
)[0]
input_string = str(rule_enforcement_api)
input_string = input_string.replace("**", "")
input_string = input_string.replace("TriggerInstanceAPI(", "")
input_string = input_string.replace("ActionExecutionAPI(", "")
input_string = input_string.replace("})", "}")
data_dict = eval(input_string)

rule_enforcement_api = data_dict
secret_parameter = []
for parameter in rule_enforcement_api["execution"]["action"]["parameters"]:
if "secret" in str(
rule_enforcement_api["execution"]["action"]["parameters"][parameter]
):
if rule_enforcement_api["execution"]["action"]["parameters"][parameter][
"secret"
]:
secret_parameter.append(parameter)
for secret in rule_enforcement_api["execution"]["parameters"]:
if secret in str(secret_parameter):
rule_enforcement_api["execution"]["parameters"][secret] = "*******"
return rule_enforcement_api

def _append_view_properties(self, rule_enforcement_apis):
Expand All @@ -93,7 +138,6 @@ def _append_view_properties(self, rule_enforcement_apis):
"""
trigger_instance_ids = set([])
execution_ids = []

for rule_enforcement_api in rule_enforcement_apis:
if rule_enforcement_api.get("trigger_instance_id", None):
trigger_instance_ids.add(
Expand All @@ -118,20 +162,18 @@ def _append_view_properties(self, rule_enforcement_apis):
execution_dbs = ActionExecution.query(
id__in=execution_ids, only_fields=only_fields
)

execution_dbs_by_id = {}
for execution_db in execution_dbs:
execution_dbs_by_id[str(execution_db.id)] = execution_db

# 2. Retrieve corresponding trigger instance objects
trigger_instance_dbs = TriggerInstance.query(id__in=list(trigger_instance_ids))

trigger_instance_dbs_by_id = {}

for trigger_instance_db in trigger_instance_dbs:
trigger_instance_dbs_by_id[
str(trigger_instance_db.id)
] = trigger_instance_db
trigger_instance_dbs_by_id[str(trigger_instance_db.id)] = (
trigger_instance_db
)

# Ammend rule enforcement objects with additional data
for rule_enforcement_api in rule_enforcement_apis:
Expand Down
Loading