Skip to content

Commit

Permalink
Merge branch 'main' into add_detection_type_list
Browse files Browse the repository at this point in the history
  • Loading branch information
pyth0n1c authored Sep 26, 2024
2 parents 7d9d128 + a199c72 commit 506bbaf
Show file tree
Hide file tree
Showing 33 changed files with 465 additions and 180 deletions.
1 change: 1 addition & 0 deletions contentctl/actions/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def execute(self, input_dto: BuildInputDto) -> DirectorOutputDto:
updated_conf_files.update(conf_output.writeObjects(input_dto.director_output_dto.investigations, SecurityContentType.investigations))
updated_conf_files.update(conf_output.writeObjects(input_dto.director_output_dto.lookups, SecurityContentType.lookups))
updated_conf_files.update(conf_output.writeObjects(input_dto.director_output_dto.macros, SecurityContentType.macros))
updated_conf_files.update(conf_output.writeObjects(input_dto.director_output_dto.dashboards, SecurityContentType.dashboards))
updated_conf_files.update(conf_output.writeAppConf())

#Ensure that the conf file we just generated/update is syntactically valid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,17 +269,25 @@ def configure_imported_roles(
):
indexes.append(self.sync_obj.replay_index)
indexes_encoded = ";".join(indexes)

try:
# Set which roles should be configured. For Enterprise Security/Integration Testing,
# we must add some extra foles.
if self.global_config.enable_integration_testing:
roles = imported_roles + enterprise_security_roles
else:
roles = imported_roles

self.get_conn().roles.post(
self.infrastructure.splunk_app_username,
imported_roles=imported_roles + enterprise_security_roles,
imported_roles=roles,
srchIndexesAllowed=indexes_encoded,
srchIndexesDefault=self.sync_obj.replay_index,
)
return
except Exception as e:
self.pbar.write(
f"Enterprise Security Roles do not exist:'{enterprise_security_roles}: {str(e)}"
f"The following role(s) do not exist:'{enterprise_security_roles}: {str(e)}"
)

self.get_conn().roles.post(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,17 @@ def get_docker_client(self):
def check_for_teardown(self):

try:
self.get_docker_client().containers.get(self.get_name())
container: docker.models.containers.Container = self.get_docker_client().containers.get(self.get_name())
except Exception as e:
if self.sync_obj.terminate is not True:
self.pbar.write(
f"Error: could not get container [{self.get_name()}]: {str(e)}"
)
self.sync_obj.terminate = True
else:
if container.status != 'running':
self.sync_obj.terminate = True
self.container = None

if self.sync_obj.terminate:
self.finish()
Expand Down
12 changes: 10 additions & 2 deletions contentctl/actions/new_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ class NewContent:

def buildDetection(self)->dict[str,Any]:
questions = NewContentQuestions.get_questions_detection()
answers = questionary.prompt(questions)
answers: dict[str,str] = questionary.prompt(
questions,
kbi_msg="User did not answer all of the prompt questions. Exiting...")
if not answers:
raise ValueError("User didn't answer one or more questions!")
answers.update(answers)
answers['name'] = answers['detection_name']
del answers['detection_name']
Expand Down Expand Up @@ -70,7 +74,11 @@ def buildDetection(self)->dict[str,Any]:

def buildStory(self)->dict[str,Any]:
questions = NewContentQuestions.get_questions_story()
answers = questionary.prompt(questions)
answers = questionary.prompt(
questions,
kbi_msg="User did not answer all of the prompt questions. Exiting...")
if not answers:
raise ValueError("User didn't answer one or more questions!")
answers['name'] = answers['story_name']
del answers['story_name']
answers['id'] = str(uuid.uuid4())
Expand Down
3 changes: 2 additions & 1 deletion contentctl/actions/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@


class Validate:
def execute(self, input_dto: validate) -> DirectorOutputDto:
def execute(self, input_dto: validate) -> DirectorOutputDto:
director_output_dto = DirectorOutputDto(
AtomicEnrichment.getAtomicEnrichment(input_dto),
AttackEnrichment.getAttackEnrichment(input_dto),
Expand All @@ -26,6 +26,7 @@ def execute(self, input_dto: validate) -> DirectorOutputDto:
[],
[],
[],
[]
)

director = Director(director_output_dto)
Expand Down
26 changes: 14 additions & 12 deletions contentctl/input/director.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
import os
import sys
import pathlib
from typing import Union
from pathlib import Path
from dataclasses import dataclass, field
from pydantic import ValidationError
from uuid import UUID
from contentctl.input.yml_reader import YmlReader


from contentctl.objects.detection import Detection
from contentctl.objects.story import Story

from contentctl.objects.enums import SecurityContentProduct
from contentctl.objects.baseline import Baseline
from contentctl.objects.investigation import Investigation
from contentctl.objects.playbook import Playbook
Expand All @@ -21,20 +18,15 @@
from contentctl.objects.atomic import AtomicEnrichment
from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.data_source import DataSource
from contentctl.objects.event_source import EventSource

from contentctl.objects.dashboard import Dashboard
from contentctl.enrichments.attack_enrichment import AttackEnrichment
from contentctl.enrichments.cve_enrichment import CveEnrichment

from contentctl.objects.config import validate
from contentctl.objects.enums import SecurityContentType

from contentctl.objects.enums import DetectionStatus
from contentctl.helper.utils import Utils




@dataclass
class DirectorOutputDto:
# Atomic Tests are first because parsing them
Expand All @@ -50,6 +42,8 @@ class DirectorOutputDto:
macros: list[Macro]
lookups: list[Lookup]
deployments: list[Deployment]
dashboards: list[Dashboard]

data_sources: list[DataSource]
name_to_content_map: dict[str, SecurityContentObject] = field(default_factory=dict)
uuid_to_content_map: dict[UUID, SecurityContentObject] = field(default_factory=dict)
Expand Down Expand Up @@ -88,6 +82,9 @@ def addContentToDictMappings(self, content: SecurityContentObject):
self.stories.append(content)
elif isinstance(content, Detection):
self.detections.append(content)
elif isinstance(content, Dashboard):
self.dashboards.append(content)

elif isinstance(content, DataSource):
self.data_sources.append(content)
else:
Expand Down Expand Up @@ -115,7 +112,7 @@ def execute(self, input_dto: validate) -> None:
self.createSecurityContent(SecurityContentType.data_sources)
self.createSecurityContent(SecurityContentType.playbooks)
self.createSecurityContent(SecurityContentType.detections)

self.createSecurityContent(SecurityContentType.dashboards)

from contentctl.objects.abstract_security_content_objects.detection_abstract import MISSING_SOURCES
if len(MISSING_SOURCES) > 0:
Expand All @@ -137,6 +134,7 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None:
SecurityContentType.playbooks,
SecurityContentType.detections,
SecurityContentType.data_sources,
SecurityContentType.dashboards
]:
files = Utils.get_all_yml_files_from_directory(
os.path.join(self.input_dto.path, str(contentType.name))
Expand All @@ -147,7 +145,7 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None:
else:
raise (Exception(f"Cannot createSecurityContent for unknown product {contentType}."))

validation_errors = []
validation_errors:list[tuple[Path,ValueError]] = []

already_ran = False
progress_percent = 0
Expand Down Expand Up @@ -189,6 +187,10 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None:
elif contentType == SecurityContentType.detections:
detection = Detection.model_validate(modelDict, context={"output_dto":self.output_dto, "app":self.input_dto.app})
self.output_dto.addContentToDictMappings(detection)

elif contentType == SecurityContentType.dashboards:
dashboard = Dashboard.model_validate(modelDict,context={"output_dto":self.output_dto})
self.output_dto.addContentToDictMappings(dashboard)

elif contentType == SecurityContentType.data_sources:
data_source = DataSource.model_validate(
Expand Down
7 changes: 5 additions & 2 deletions contentctl/input/new_content_questions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from typing import Any


class NewContentQuestions:

@classmethod
def get_questions_detection(self) -> list:
def get_questions_detection(cls) -> list[dict[str,Any]]:
questions = [
{
"type": "text",
Expand Down Expand Up @@ -116,7 +119,7 @@ def get_questions_detection(self) -> list:
return questions

@classmethod
def get_questions_story(self) -> list:
def get_questions_story(cls)-> list[dict[str,Any]]:
questions = [
{
"type": "text",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
if TYPE_CHECKING:
from contentctl.input.director import DirectorOutputDto
from contentctl.objects.baseline import Baseline

from contentctl.objects.config import CustomApp

from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.enums import AnalyticsType
from contentctl.objects.enums import DataModel
Expand All @@ -36,10 +37,16 @@
from contentctl.objects.data_source import DataSource
from contentctl.objects.base_test_result import TestResultStatus

# from contentctl.objects.playbook import Playbook
from contentctl.objects.enums import ProvidingTechnology
from contentctl.enrichments.cve_enrichment import CveEnrichmentObj
import datetime
from contentctl.objects.constants import (
ES_MAX_STANZA_LENGTH,
ES_SEARCH_STANZA_NAME_FORMAT_AFTER_CLONING_IN_PRODUCT_TEMPLATE,
CONTENTCTL_MAX_SEARCH_NAME_LENGTH,
CONTENTCTL_DETECTION_STANZA_NAME_FORMAT_TEMPLATE
)

MISSING_SOURCES: set[str] = set()

# Those AnalyticsTypes that we do not test via contentctl
Expand All @@ -51,15 +58,25 @@
# TODO (#266): disable the use_enum_values configuration
class Detection_Abstract(SecurityContentObject):
model_config = ConfigDict(use_enum_values=True)

# contentType: SecurityContentType = SecurityContentType.detections
name:str = Field(...,max_length=CONTENTCTL_MAX_SEARCH_NAME_LENGTH)
#contentType: SecurityContentType = SecurityContentType.detections
type: AnalyticsType = Field(...)
status: DetectionStatus = Field(...)
data_source: list[str] = []
tags: DetectionTags = Field(...)
search: str = Field(...)
how_to_implement: str = Field(..., min_length=4)
known_false_positives: str = Field(..., min_length=4)
explanation: None | str = Field(
default=None,
exclude=True, #Don't serialize this value when dumping the object
description="Provide an explanation to be included "
"in the 'Explanation' field of the Detection in "
"the Use Case Library. If this field is not "
"defined in the YML, it will default to the "
"value of the 'description' field when "
"serialized in analyticstories_detections.j2",
)

enabled_by_default: bool = False
file_path: FilePath = Field(...)
Expand All @@ -70,10 +87,30 @@ class Detection_Abstract(SecurityContentObject):
# https://github.com/pydantic/pydantic/issues/9101#issuecomment-2019032541
tests: List[Annotated[Union[UnitTest, IntegrationTest, ManualTest], Field(union_mode='left_to_right')]] = []
# A list of groups of tests, relying on the same data
test_groups: Union[list[TestGroup], None] = Field(None, validate_default=True)
test_groups: list[TestGroup] = []

data_source_objects: list[DataSource] = []

def get_conf_stanza_name(self, app:CustomApp)->str:
stanza_name = CONTENTCTL_DETECTION_STANZA_NAME_FORMAT_TEMPLATE.format(app_label=app.label, detection_name=self.name)
self.check_conf_stanza_max_length(stanza_name)
return stanza_name


def get_action_dot_correlationsearch_dot_label(self, app:CustomApp, max_stanza_length:int=ES_MAX_STANZA_LENGTH)->str:
stanza_name = self.get_conf_stanza_name(app)
stanza_name_after_saving_in_es = ES_SEARCH_STANZA_NAME_FORMAT_AFTER_CLONING_IN_PRODUCT_TEMPLATE.format(
security_domain_value = self.tags.security_domain.value,
search_name = stanza_name
)


if len(stanza_name_after_saving_in_es) > max_stanza_length:
raise ValueError(f"label may only be {max_stanza_length} characters to allow updating in-product, "
f"but stanza was actually {len(stanza_name_after_saving_in_es)} characters: '{stanza_name_after_saving_in_es}' ")

return stanza_name

@field_validator("search", mode="before")
@classmethod
def validate_presence_of_filter_macro(cls, value:str, info:ValidationInfo)->str:
Expand Down Expand Up @@ -519,7 +556,7 @@ def model_post_init(self, __context: Any) -> None:
self.data_source_objects = matched_data_sources

for story in self.tags.analytic_story:
story.detections.append(self)
story.detections.append(self)

self.cve_enrichment_func(__context)

Expand Down Expand Up @@ -654,6 +691,27 @@ def addTags_nist(self):
else:
self.tags.nist = [NistCategory.DE_AE]
return self


@model_validator(mode="after")
def ensureThrottlingFieldsExist(self):
'''
For throttling to work properly, the fields to throttle on MUST
exist in the search itself. If not, then we cannot apply the throttling
'''
if self.tags.throttling is None:
# No throttling configured for this detection
return self

missing_fields:list[str] = [field for field in self.tags.throttling.fields if field not in self.search]
if len(missing_fields) > 0:
raise ValueError(f"The following throttle fields were missing from the search: {missing_fields}")

else:
# All throttling fields present in search
return self



@model_validator(mode="after")
def ensureProperObservablesExist(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
from contentctl.objects.deployment import Deployment
from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.input.director import DirectorOutputDto
from contentctl.objects.config import CustomApp

from contentctl.objects.enums import AnalyticsType
from contentctl.objects.constants import CONTENTCTL_MAX_STANZA_LENGTH
import abc
import uuid
import datetime
Expand All @@ -31,14 +33,14 @@

# TODO (#266): disable the use_enum_values configuration
class SecurityContentObject_Abstract(BaseModel, abc.ABC):
model_config = ConfigDict(use_enum_values=True, validate_default=True)

name: str = Field(...)
author: str = Field("Content Author", max_length=255)
date: datetime.date = Field(datetime.date.today())
version: NonNegativeInt = 1
id: uuid.UUID = Field(default_factory=uuid.uuid4) # we set a default here until all content has a uuid
description: str = Field("Enter Description Here", max_length=10000)
model_config = ConfigDict(use_enum_values=True,validate_default=True)
name: str = Field(...,max_length=99)
author: str = Field(...,max_length=255)
date: datetime.date = Field(...)
version: NonNegativeInt = Field(...)
id: uuid.UUID = Field(...) #we set a default here until all content has a uuid
description: str = Field(...,max_length=10000)
file_path: Optional[FilePath] = None
references: Optional[List[HttpUrl]] = None

Expand All @@ -56,7 +58,13 @@ def serialize_model(self):
"description": self.description,
"references": [str(url) for url in self.references or []]
}



def check_conf_stanza_max_length(self, stanza_name:str, max_stanza_length:int=CONTENTCTL_MAX_STANZA_LENGTH) -> None:
if len(stanza_name) > max_stanza_length:
raise ValueError(f"conf stanza may only be {max_stanza_length} characters, "
f"but stanza was actually {len(stanza_name)} characters: '{stanza_name}' ")

@staticmethod
def objectListToNameList(objects: list[SecurityContentObject]) -> list[str]:
return [object.getName() for object in objects]
Expand Down
Loading

0 comments on commit 506bbaf

Please sign in to comment.