From a166803ea51ab361d8c729ebcf997e10f5bdf183 Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Wed, 9 Oct 2024 12:02:12 -0700 Subject: [PATCH] Much bigger changes around metadata validation to enable generation of a descriptive YML file which can then be processed so that certain changes can be applied automatically. However, these changes will be maintained for a future release. The initial set of simpler changes will be split off into a simpler, preliminary release. --- contentctl/actions/inspect.py | 64 ++++-- contentctl/contentctl.py | 4 +- .../detection_abstract.py | 23 +- contentctl/objects/config.py | 9 +- contentctl/objects/errors.py | 214 +++++++++++++++++- 5 files changed, 276 insertions(+), 38 deletions(-) diff --git a/contentctl/actions/inspect.py b/contentctl/actions/inspect.py index 1a8bc50d..9ed9967e 100644 --- a/contentctl/actions/inspect.py +++ b/contentctl/actions/inspect.py @@ -16,9 +16,11 @@ DetectionIDError, DetectionMissingError, VersionDecrementedError, - VersionBumpingError + VersionBumpingError, + MetadataValidationErrorFile ) - +from contentctl.input.director import DirectorOutputDto +from contentctl.objects.detection import Detection @dataclass(frozen=True) class InspectInputDto: @@ -27,14 +29,14 @@ class InspectInputDto: class Inspect: - def execute(self, config: inspect) -> str: + def execute(self, config: inspect, director:DirectorOutputDto) -> str: if config.build_app or config.build_api: - self.inspectAppCLI(config) - appinspect_token = self.inspectAppAPI(config) + #self.inspectAppCLI(config) + #appinspect_token = self.inspectAppAPI(config) if config.enable_metadata_validation: - self.check_detection_metadata(config) + self.check_detection_metadata(config, director) else: print("🟡 Detection metadata validation disabled, skipping.") @@ -266,7 +268,26 @@ def parseAppinspectJsonLogFile( return - def check_detection_metadata(self, config: inspect) -> None: + + def print_and_dump_metadata_errors_to_yml_file(self, config:inspect, errors_list: list[MetadataValidationError]) -> None: + """During validation, there may be a significant number of errors. + In fact, there may be so many that when running at the command line, + they are lost in the backscroll. Additionally, it may be extremely + helpful to have these errors parsed by a separate program that can + then update the relevant files to fix the error automatically. + This function writes out a strucuted file containing all errors + to enable review and parsing by followon tools. + + Args: + validation_errors (dict[str, list[MetadataValidationError]]):List of all errors for output. + """ + m = MetadataValidationErrorFile.parse_from_errors_list(errors_list) + m.print_errors() + m.write_to_file(config.metadata_results_file) + + #def emit_metadata_validation_summary(self, ) + + def check_detection_metadata(self, config: inspect, director:DirectorOutputDto) -> None: """ Using a previous build, compare the savedsearches.conf files to detect any issues w/ detection metadata. **NOTE**: Detection metadata validation can only be performed between @@ -295,12 +316,15 @@ def check_detection_metadata(self, config: inspect) -> None: validation_errors: dict[str, list[MetadataValidationError]] = {} for rule_name in previous_build_conf.detection_stanzas: validation_errors[rule_name] = [] - # No detections should be removed from build to build + # No detections should be removed from build to build. + # However, this error can be converted to a warning via + # the following command line flag. This functionality exists + # to support validation of public-only vs public+private content if rule_name not in current_build_conf.detection_stanzas: - if config.supress_missing_content_exceptions: - print(f"[SUPPRESSED] {DetectionMissingError(rule_name=rule_name).long_message}") - else: - validation_errors[rule_name].append(DetectionMissingError(rule_name=rule_name)) + validation_errors[rule_name].append( + DetectionMissingError(rule_name=rule_name, + suppress_as_warning=config.suppress_missing_content_exceptions)) + continue # Pull out the individual stanza for readability previous_stanza = previous_build_conf.detection_stanzas[rule_name] @@ -310,6 +334,7 @@ def check_detection_metadata(self, config: inspect) -> None: if current_stanza.metadata.detection_id != previous_stanza.metadata.detection_id: validation_errors[rule_name].append( DetectionIDError( + file_path=Detection.get_detection_object_from_stanza_name(rule_name, config.app, director.detections).file_path, rule_name=rule_name, current_id=current_stanza.metadata.detection_id, previous_id=previous_stanza.metadata.detection_id @@ -320,6 +345,7 @@ def check_detection_metadata(self, config: inspect) -> None: if current_stanza.metadata.detection_version < previous_stanza.metadata.detection_version: validation_errors[rule_name].append( VersionDecrementedError( + file_path=Detection.get_detection_object_from_stanza_name(rule_name, config.app, director.detections).file_path, rule_name=rule_name, current_version=current_stanza.metadata.detection_version, previous_version=previous_stanza.metadata.detection_version @@ -330,6 +356,7 @@ def check_detection_metadata(self, config: inspect) -> None: if current_stanza.version_should_be_bumped(previous_stanza): validation_errors[rule_name].append( VersionBumpingError( + file_path=Detection.get_detection_object_from_stanza_name(rule_name, config.app, director.detections).file_path, rule_name=rule_name, current_version=current_stanza.metadata.detection_version, previous_version=previous_stanza.metadata.detection_version @@ -339,18 +366,7 @@ def check_detection_metadata(self, config: inspect) -> None: # Convert our dict mapping to a flat list of errors for use in reporting validation_error_list = [x for inner_list in validation_errors.values() for x in inner_list] - # Report failure/success - print("\nDetection Metadata Validation:") - if len(validation_error_list) > 0: - # Iterate over each rule and report the failures - for rule_name in validation_errors: - if len(validation_errors[rule_name]) > 0: - print(f"\t❌ {rule_name}") - for error in validation_errors[rule_name]: - print(f"\t\t🔸 {error.short_message}") - else: - # If no errors in the list, report success - print("\t✅ Detection metadata looks good and all versions were bumped appropriately :)") + self.print_and_dump_metadata_errors_to_yml_file(config, validation_error_list) # Raise an ExceptionGroup for all validation issues if len(validation_error_list) > 0: diff --git a/contentctl/contentctl.py b/contentctl/contentctl.py index dbf434a7..56171eb8 100644 --- a/contentctl/contentctl.py +++ b/contentctl/contentctl.py @@ -81,8 +81,8 @@ def build_func(config:build)->DirectorOutputDto: def inspect_func(config:inspect)->str: #Make sure that we have built the most recent version of the app - _ = build_func(config) - inspect_token = Inspect().execute(config) + director = build_func(config) + inspect_token = Inspect().execute(config, director) return inspect_token diff --git a/contentctl/objects/abstract_security_content_objects/detection_abstract.py b/contentctl/objects/abstract_security_content_objects/detection_abstract.py index 1b716097..dba09ca0 100644 --- a/contentctl/objects/abstract_security_content_objects/detection_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/detection_abstract.py @@ -1,5 +1,5 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Union, Optional, List, Any, Annotated +from typing import TYPE_CHECKING, Union, Optional, List, Any, Annotated, Self import re import pathlib from enum import Enum @@ -882,6 +882,27 @@ def all_tests_successful(self) -> bool: # If all tests are successful (PASS/SKIP), return True return True + @classmethod + def get_detection_object_from_stanza_name(cls, stanza_name: str, app:CustomApp, detections: list[Self]) -> Self: + """Just as we have a way to go from detection to conf stanza, provide a mapping to go from conf stanza to detection. + + Args: + stanza_name (str): The stanza name, probably read from savedsearches.conf + app (CustomApp): The app that was used to build the conf file. + detections (list[Detection_Abstract]): List of all detections in this repo + + Raises: + Exception: An exception will be raised if the expected detection is not found + + Returns: + Detection_Abstract: return the Detection_Abtract object with the appropriate name + """ + matching_detections = list(filter(lambda d: d.get_conf_stanza_name(app) == stanza_name, detections)) + if len(matching_detections) == 1: + return matching_detections[0] + raise Exception(f"Failed to find exactly one detection with the conf stanza '{stanza_name}'") + + def get_summary( self, detection_fields: list[str] = [ diff --git a/contentctl/objects/config.py b/contentctl/objects/config.py index 76374f50..974909b4 100644 --- a/contentctl/objects/config.py +++ b/contentctl/objects/config.py @@ -308,7 +308,7 @@ class inspect(build): "should be enabled." ) ) - supress_missing_content_exceptions: bool = Field( + suppress_missing_content_exceptions: bool = Field( default=False, description=( "Suppress exceptions during metadata validation if a detection that existed in " @@ -336,6 +336,13 @@ class inspect(build): ) stack_type: StackType = Field(description="The type of your Splunk Cloud Stack") + @property + def metadata_results_file(self) -> pathlib.Path: + + return self.getPackageFilePath().parent / (self.getPackageFilePath().name + ".metadata-validation-results.yml") + + + @field_validator("enrichments", mode="after") @classmethod def validate_needed_flags_metadata_validation(cls, v: bool, info: ValidationInfo) -> bool: diff --git a/contentctl/objects/errors.py b/contentctl/objects/errors.py index 06f7751a..1439bcc4 100644 --- a/contentctl/objects/errors.py +++ b/contentctl/objects/errors.py @@ -1,6 +1,12 @@ +from __future__ import annotations from abc import ABC, abstractmethod +from pydantic import BaseModel, ConfigDict +from pydantic.dataclasses import dataclass from uuid import UUID - +from typing import Self, Sequence +from contentctl.output.yml_writer import YmlWriter +import pathlib +from contentctl.objects.constants import CONTENTCTL_MAX_STANZA_LENGTH class ValidationFailed(Exception): """Indicates not an error in execution, but a validation failure""" @@ -27,8 +33,34 @@ class MetadataValidationError(Exception, ABC): Base class for any errors arising from savedsearches.conf detection metadata validation """ # The name of the rule the error relates to + # rule_name must be set in the _init_ functions of + # individual exceptions, rather than here, because + # it is used to calculate the long_message and + # short_message properties. rule_name: str + # Determine whether or not this Exception + # should actually be treated as a WARNING + # rather than an Exception. This allows + # individual Exceptions to be enabled or + # disabled based on command line flags + suppress_as_warning: bool + + file_path: pathlib.Path + + def __init__( + self, + file_path: pathlib.Path, + rule_name: str, + suppress_as_warning: bool = False, + *args: object + ) -> None: + print(f"file path in MVE: {file_path}") + self.file_path = file_path, + self.rule_name = rule_name + self.suppress_as_warning = suppress_as_warning + super().__init__(self.long_message, *args) + @property @abstractmethod def long_message(self) -> str: @@ -46,6 +78,28 @@ def short_message(self) -> str: :returns: a str, the message """ raise NotImplementedError() + + @staticmethod + @abstractmethod + def error_plain_name() -> str: + """ + A plain, English language name for the error + :returns: a str, the message + """ + raise NotImplementedError() + + def toJSON(self) -> dict[str, str | bool | UUID | int]: + return {"rule_name": self.rule_name, + "suppress_as_warning": self.suppress_as_warning, + "short_message": self.short_message, + "long_message": self.long_message, + "file_path": str(self.file_path)} + + + @classmethod + def sort_and_filter(cls, errors: list[MetadataValidationError]) -> list[Self]: + x = list(filter(lambda error: isinstance(error, cls), errors)) + return sorted(x, key = lambda e: (e.rule_name, e.suppress_as_warning)) class DetectionMissingError(MetadataValidationError): @@ -55,10 +109,11 @@ class DetectionMissingError(MetadataValidationError): def __init__( self, rule_name: str, + suppress_as_warning: bool = False, *args: object ) -> None: self.rule_name = rule_name - super().__init__(self.long_message, *args) + super().__init__(pathlib.Path("missing"), self.rule_name, suppress_as_warning, self.long_message, *args) @property def long_message(self) -> str: @@ -67,7 +122,7 @@ def long_message(self) -> str: :returns: a str, the message """ return ( - f"Rule '{self.rule_name}' in previous build not found in current build; " + f"'{self.rule_name}' in previous build not found in current build; " "detection may have been removed or renamed." ) @@ -78,9 +133,19 @@ def short_message(self) -> str: :returns: a str, the message """ return ( - "Detection from previous build not found in current build." + f"{self.rule_name.ljust(CONTENTCTL_MAX_STANZA_LENGTH)} from previous build not found in current build." ) + @staticmethod + def error_plain_name() -> str: + """ + A plain, English language name for the error + :returns: a str, the message + """ + return "Detection Missing Error" + + + class DetectionIDError(MetadataValidationError): """ @@ -94,15 +159,18 @@ class DetectionIDError(MetadataValidationError): def __init__( self, + file_path:pathlib.Path, rule_name: str, current_id: UUID, previous_id: UUID, + suppress_as_warning: bool = False, *args: object ) -> None: + print(f"file path in DIDE: {file_path}") self.rule_name = rule_name self.current_id = current_id self.previous_id = previous_id - super().__init__(self.long_message, *args) + super().__init__(file_path, self.rule_name, suppress_as_warning, self.long_message, *args) @property def long_message(self) -> str: @@ -123,9 +191,20 @@ def short_message(self) -> str: :returns: a str, the message """ return ( - f"Detection ID {self.current_id} in current build does not match ID {self.previous_id} in previous build." + f"{self.rule_name.ljust(CONTENTCTL_MAX_STANZA_LENGTH)} with id {self.current_id} in current build does not match ID {self.previous_id} in previous build." ) + + @staticmethod + def error_plain_name() -> str: + """ + A plain, English language name for the error + :returns: a str, the message + """ + return "Detection ID Error" + def toJSON(self) -> dict[str, str | bool | UUID | int]: + return super().toJSON() | {"current_id": self.current_id, + "previous_id": self.previous_id} class VersioningError(MetadataValidationError, ABC): """ @@ -139,15 +218,22 @@ class VersioningError(MetadataValidationError, ABC): def __init__( self, + file_path:pathlib.Path, rule_name: str, current_version: int, previous_version: int, + suppress_as_warning: bool = False, *args: object ) -> None: + print(f"file path in VE: {file_path}") self.rule_name = rule_name self.current_version = current_version self.previous_version = previous_version - super().__init__(self.long_message, *args) + super().__init__(file_path, self.rule_name, suppress_as_warning, self.long_message, *args) + + def toJSON(self) -> dict[str, str | bool | UUID | int]: + return super().toJSON() | {"current_version": self.current_version, + "previous_version": self.previous_version} class VersionDecrementedError(VersioningError): @@ -173,15 +259,37 @@ def short_message(self) -> str: :returns: a str, the message """ return ( - f"Detection version ({self.current_version}) in current build is less than version " + f"{self.rule_name.ljust(CONTENTCTL_MAX_STANZA_LENGTH)} with version ({self.current_version}) in current build is less than version " f"({self.previous_version}) in previous build." ) + @staticmethod + def error_plain_name() -> str: + """ + A plain, English language name for the error + :returns: a str, the message + """ + return "Versioning Error" + class VersionBumpingError(VersioningError): """ An error indicating the detection changed but its version wasn't bumped appropriately """ + + @property + def bumped_version(self) -> int: + """ + Returns the verion that we should bump to. + By default, we should bump one version above + the previous_version. However, it is not considered + an ERROR to bump more than 1 if done in the YML. + + Returns: + int: previous_version + 1 + """ + return self.previous_version + 1 + @property def long_message(self) -> str: """ @@ -191,7 +299,7 @@ def long_message(self) -> str: return ( f"Rule '{self.rule_name}' has changed in current build compared to previous " "build (stanza hashes differ); the detection version should be bumped " - f"to at least {self.previous_version + 1}." + f"to at least {self.bumped_version}." ) @property @@ -201,5 +309,91 @@ def short_message(self) -> str: :returns: a str, the message """ return ( - f"Detection version in current build should be bumped to at least {self.previous_version + 1}." + f"{self.rule_name.ljust(CONTENTCTL_MAX_STANZA_LENGTH)} version must be bumped to at least {self.bumped_version}." ) + + @staticmethod + def error_plain_name() -> str: + """ + A plain, English language name for the error + :returns: a str, the message + """ + return "Version Bumping Error" + + def toJSON(self) -> dict[str, str | bool | UUID | int]: + return super().toJSON() | {"current_version": self.current_version, + "previous_version": self.previous_version, + "bumped_version": self.bumped_version} + +class MetadataValidationErrorFile(BaseModel): + model_config = ConfigDict(arbitrary_types_allowed=True) + + detectionMissingErrors:list[DetectionMissingError] = [] + detectionIdErrors: list[DetectionIDError] = [] + versionDecrementedErrors: list[VersionDecrementedError] = [] + versionBumpingErrors: list[VersionBumpingError] = [] + + @classmethod + def parse_from_errors_list(cls, errors: list[MetadataValidationError]) -> Self: + return cls( + detectionMissingErrors=DetectionMissingError.sort_and_filter(errors), + detectionIdErrors=DetectionIDError.sort_and_filter(errors), + versionDecrementedErrors=VersionDecrementedError.sort_and_filter(errors), + versionBumpingErrors=VersionBumpingError.sort_and_filter(errors), + ) + + def add_exception(self, exception:MetadataValidationError): + if isinstance(exception, DetectionMissingError): + self.detectionMissingErrors.append(exception) + elif isinstance(exception, DetectionIDError): + self.detectionIdErrors.append(exception) + elif isinstance(exception, VersionDecrementedError): + self.versionDecrementedErrors.append(exception) + elif isinstance(exception, VersionBumpingError): + self.versionBumpingErrors.append(exception) + else: + raise Exception("Unknown error type when generating " + f"Metadata Validation Error File - {type(exception)}") + + + + def cli_format_section(self, errors: Sequence[MetadataValidationError], section_name:str) -> str: + """ + A header to print the the command line + :returns: a str, the message + """ + is_suppressed = True if True in [e.suppress_as_warning for e in errors] else False + + if is_suppressed: + return f"{section_name} - {len(errors)} Suppressed Errors" + + header = f"{section_name} - {len(errors)} Errors" + body = '\n ❌'.join([""]+[error.short_message for error in errors]) + return f"{header}{body}" + + def print_errors(self): + non_suppressed_errors = list(filter(lambda e: not e.suppress_as_warning, + self.detectionMissingErrors + + self.detectionIdErrors + + self.versionDecrementedErrors + + self.versionBumpingErrors)) + if len(non_suppressed_errors) == 0: + print("\t✅ Detection metadata looks good and all versions were bumped appropriately :)") + else: + print(self.cli_format_section(self.detectionMissingErrors, DetectionMissingError.error_plain_name())) + print(self.cli_format_section(self.detectionIdErrors, str(DetectionIDError.error_plain_name()))) + print(self.cli_format_section(self.versionDecrementedErrors, str(VersionDecrementedError.error_plain_name()))) + print(self.cli_format_section(self.versionBumpingErrors, str(VersionBumpingError.error_plain_name()))) + + + + + def write_to_file(self, output_file:pathlib.Path): + output_dict:dict[str,list[dict[str, str | bool | UUID | int]]] = {} + output_dict['detectionMissingErrors'] = [e.toJSON() for e in self.detectionMissingErrors] + output_dict['detectionIdErrors'] = [e.toJSON() for e in self.detectionIdErrors] + output_dict['versionDecrementedErrors'] = [e.toJSON() for e in self.versionDecrementedErrors] + output_dict['versionBumpingErrors'] = [e.toJSON() for e in self.versionBumpingErrors] + YmlWriter.writeYmlFile(str(output_file), output_dict) + +