Skip to content

Commit

Permalink
Much bigger changes around metadata validation
Browse files Browse the repository at this point in the history
to enable generation of a descriptive YML
file which can then be processed so that
certain changes can be applied
automatically. However, these changes
will be maintained for a future release.
The initial set of simpler changes will
be split off into a simpler, preliminary
release.
  • Loading branch information
pyth0n1c committed Oct 9, 2024
1 parent c457b3d commit a166803
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 38 deletions.
64 changes: 40 additions & 24 deletions contentctl/actions/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
DetectionIDError,
DetectionMissingError,
VersionDecrementedError,
VersionBumpingError
VersionBumpingError,
MetadataValidationErrorFile
)

from contentctl.input.director import DirectorOutputDto
from contentctl.objects.detection import Detection

@dataclass(frozen=True)
class InspectInputDto:
Expand All @@ -27,14 +29,14 @@ class InspectInputDto:

class Inspect:

def execute(self, config: inspect) -> str:
def execute(self, config: inspect, director:DirectorOutputDto) -> str:
if config.build_app or config.build_api:

self.inspectAppCLI(config)
appinspect_token = self.inspectAppAPI(config)
#self.inspectAppCLI(config)
#appinspect_token = self.inspectAppAPI(config)

if config.enable_metadata_validation:
self.check_detection_metadata(config)
self.check_detection_metadata(config, director)
else:
print("🟡 Detection metadata validation disabled, skipping.")

Expand Down Expand Up @@ -266,7 +268,26 @@ def parseAppinspectJsonLogFile(

return

def check_detection_metadata(self, config: inspect) -> None:

def print_and_dump_metadata_errors_to_yml_file(self, config:inspect, errors_list: list[MetadataValidationError]) -> None:
"""During validation, there may be a significant number of errors.
In fact, there may be so many that when running at the command line,
they are lost in the backscroll. Additionally, it may be extremely
helpful to have these errors parsed by a separate program that can
then update the relevant files to fix the error automatically.
This function writes out a strucuted file containing all errors
to enable review and parsing by followon tools.
Args:
validation_errors (dict[str, list[MetadataValidationError]]):List of all errors for output.
"""
m = MetadataValidationErrorFile.parse_from_errors_list(errors_list)
m.print_errors()
m.write_to_file(config.metadata_results_file)

#def emit_metadata_validation_summary(self, )

def check_detection_metadata(self, config: inspect, director:DirectorOutputDto) -> None:
"""
Using a previous build, compare the savedsearches.conf files to detect any issues w/
detection metadata. **NOTE**: Detection metadata validation can only be performed between
Expand Down Expand Up @@ -295,12 +316,15 @@ def check_detection_metadata(self, config: inspect) -> None:
validation_errors: dict[str, list[MetadataValidationError]] = {}
for rule_name in previous_build_conf.detection_stanzas:
validation_errors[rule_name] = []
# No detections should be removed from build to build
# No detections should be removed from build to build.
# However, this error can be converted to a warning via
# the following command line flag. This functionality exists
# to support validation of public-only vs public+private content
if rule_name not in current_build_conf.detection_stanzas:
if config.supress_missing_content_exceptions:
print(f"[SUPPRESSED] {DetectionMissingError(rule_name=rule_name).long_message}")
else:
validation_errors[rule_name].append(DetectionMissingError(rule_name=rule_name))
validation_errors[rule_name].append(
DetectionMissingError(rule_name=rule_name,
suppress_as_warning=config.suppress_missing_content_exceptions))

continue
# Pull out the individual stanza for readability
previous_stanza = previous_build_conf.detection_stanzas[rule_name]
Expand All @@ -310,6 +334,7 @@ def check_detection_metadata(self, config: inspect) -> None:
if current_stanza.metadata.detection_id != previous_stanza.metadata.detection_id:
validation_errors[rule_name].append(
DetectionIDError(
file_path=Detection.get_detection_object_from_stanza_name(rule_name, config.app, director.detections).file_path,
rule_name=rule_name,
current_id=current_stanza.metadata.detection_id,
previous_id=previous_stanza.metadata.detection_id
Expand All @@ -320,6 +345,7 @@ def check_detection_metadata(self, config: inspect) -> None:
if current_stanza.metadata.detection_version < previous_stanza.metadata.detection_version:
validation_errors[rule_name].append(
VersionDecrementedError(
file_path=Detection.get_detection_object_from_stanza_name(rule_name, config.app, director.detections).file_path,
rule_name=rule_name,
current_version=current_stanza.metadata.detection_version,
previous_version=previous_stanza.metadata.detection_version
Expand All @@ -330,6 +356,7 @@ def check_detection_metadata(self, config: inspect) -> None:
if current_stanza.version_should_be_bumped(previous_stanza):
validation_errors[rule_name].append(
VersionBumpingError(
file_path=Detection.get_detection_object_from_stanza_name(rule_name, config.app, director.detections).file_path,
rule_name=rule_name,
current_version=current_stanza.metadata.detection_version,
previous_version=previous_stanza.metadata.detection_version
Expand All @@ -339,18 +366,7 @@ def check_detection_metadata(self, config: inspect) -> None:
# Convert our dict mapping to a flat list of errors for use in reporting
validation_error_list = [x for inner_list in validation_errors.values() for x in inner_list]

# Report failure/success
print("\nDetection Metadata Validation:")
if len(validation_error_list) > 0:
# Iterate over each rule and report the failures
for rule_name in validation_errors:
if len(validation_errors[rule_name]) > 0:
print(f"\t{rule_name}")
for error in validation_errors[rule_name]:
print(f"\t\t🔸 {error.short_message}")
else:
# If no errors in the list, report success
print("\t✅ Detection metadata looks good and all versions were bumped appropriately :)")
self.print_and_dump_metadata_errors_to_yml_file(config, validation_error_list)

# Raise an ExceptionGroup for all validation issues
if len(validation_error_list) > 0:
Expand Down
4 changes: 2 additions & 2 deletions contentctl/contentctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ def build_func(config:build)->DirectorOutputDto:

def inspect_func(config:inspect)->str:
#Make sure that we have built the most recent version of the app
_ = build_func(config)
inspect_token = Inspect().execute(config)
director = build_func(config)
inspect_token = Inspect().execute(config, director)
return inspect_token


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Union, Optional, List, Any, Annotated
from typing import TYPE_CHECKING, Union, Optional, List, Any, Annotated, Self
import re
import pathlib
from enum import Enum
Expand Down Expand Up @@ -882,6 +882,27 @@ def all_tests_successful(self) -> bool:
# If all tests are successful (PASS/SKIP), return True
return True

@classmethod
def get_detection_object_from_stanza_name(cls, stanza_name: str, app:CustomApp, detections: list[Self]) -> Self:
"""Just as we have a way to go from detection to conf stanza, provide a mapping to go from conf stanza to detection.
Args:
stanza_name (str): The stanza name, probably read from savedsearches.conf
app (CustomApp): The app that was used to build the conf file.
detections (list[Detection_Abstract]): List of all detections in this repo
Raises:
Exception: An exception will be raised if the expected detection is not found
Returns:
Detection_Abstract: return the Detection_Abtract object with the appropriate name
"""
matching_detections = list(filter(lambda d: d.get_conf_stanza_name(app) == stanza_name, detections))
if len(matching_detections) == 1:
return matching_detections[0]
raise Exception(f"Failed to find exactly one detection with the conf stanza '{stanza_name}'")


def get_summary(
self,
detection_fields: list[str] = [
Expand Down
9 changes: 8 additions & 1 deletion contentctl/objects/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ class inspect(build):
"should be enabled."
)
)
supress_missing_content_exceptions: bool = Field(
suppress_missing_content_exceptions: bool = Field(
default=False,
description=(
"Suppress exceptions during metadata validation if a detection that existed in "
Expand Down Expand Up @@ -336,6 +336,13 @@ class inspect(build):
)
stack_type: StackType = Field(description="The type of your Splunk Cloud Stack")

@property
def metadata_results_file(self) -> pathlib.Path:

return self.getPackageFilePath().parent / (self.getPackageFilePath().name + ".metadata-validation-results.yml")



@field_validator("enrichments", mode="after")
@classmethod
def validate_needed_flags_metadata_validation(cls, v: bool, info: ValidationInfo) -> bool:
Expand Down
Loading

0 comments on commit a166803

Please sign in to comment.