Skip to content

Commit

Permalink
Add the possibility
Browse files Browse the repository at this point in the history
to automatically create
drilldowns. We will
likely remove this, but let's
keep it now for purposes
of discussion.
  • Loading branch information
pyth0n1c committed Sep 18, 2024
1 parent 72e3354 commit 9ba9300
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 7 deletions.
5 changes: 4 additions & 1 deletion contentctl/contentctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,10 @@ def main():
t.__dict__.update(config.__dict__)
init_func(t)
elif type(config) == validate:
validate_func(config)
v=validate_func(config)
import code
code.interact(local=
locals())
elif type(config) == report:
report_func(config)
elif type(config) == build:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from contentctl.objects.integration_test import IntegrationTest
from contentctl.objects.data_source import DataSource
from contentctl.objects.base_test_result import TestResultStatus

from contentctl.objects.drilldown import Drilldown
# from contentctl.objects.playbook import Playbook
from contentctl.objects.enums import ProvidingTechnology
from contentctl.enrichments.cve_enrichment import CveEnrichmentObj
Expand Down Expand Up @@ -73,6 +73,7 @@ class Detection_Abstract(SecurityContentObject):
test_groups: Union[list[TestGroup], None] = Field(None, validate_default=True)

data_source_objects: list[DataSource] = []
drilldown_searches: list[Drilldown] = Field(default=[], description="A list of Drilldowns that should be included with this search")

@field_validator("search", mode="before")
@classmethod
Expand Down Expand Up @@ -525,6 +526,15 @@ def model_post_init(self, __context: Any) -> None:
# Derive TestGroups and IntegrationTests, adjust for ManualTests, skip as needed
self.adjust_tests_and_groups()

#Add the default drilldown
#self.drilldown_searches.append(Drilldown.constructDrilldownFromDetection(self))

# Update the search fields with the original search, if required
for drilldown in self.drilldown_searches:
drilldown.perform_search_substitutions(self)
print("adding default drilldown?")
self.drilldown_searches.append(Drilldown.constructDrilldownFromDetection(self))

@field_validator('lookups', mode="before")
@classmethod
def getDetectionLookups(cls, v:list[str], info:ValidationInfo) -> list[Lookup]:
Expand Down
2 changes: 0 additions & 2 deletions contentctl/objects/detection_tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
SecurityContentProductName
)
from contentctl.objects.atomic import AtomicTest
from contentctl.objects.drilldown import Drilldown
from contentctl.objects.annotated_types import MITRE_ATTACK_ID_TYPE, CVE_TYPE

# TODO (#266): disable the use_enum_values configuration
Expand Down Expand Up @@ -113,7 +112,6 @@ def cis20(self) -> list[Cis18Value]:

# TODO (#268): Validate manual_test has length > 0 if not None
manual_test: Optional[str] = None
drilldown: Drilldown | None = None

# The following validator is temporarily disabled pending further discussions
# @validator('message')
Expand Down
34 changes: 31 additions & 3 deletions contentctl/objects/drilldown.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,34 @@
from __future__ import annotations
from pydantic import BaseModel, Field
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from contentctl.objects.detection import Detection
from contentctl.objects.enums import AnalyticsType
SEARCH_PLACEHOLDER = "%original_detection_search%"

class Drilldown(BaseModel):
name: str = Field(...,min_length=5)
name: str = Field(..., description="The name of the drilldown search", min_length=5)
search: str = Field(..., description="The text of a drilldown search. This must be valid SPL.", min_length=1)
earliest_offset:str = "$info_min_time$"
latest_offset:str = "$info_max_time$"
earliest_offset:str = Field(default="$info_min_time$", description="Earliest offset time for the drilldown search", min_length= 1)
latest_offset:str = Field(default="$info_max_time$", description="Latest offset time for the driolldown search", min_length= 1)

@classmethod
def constructDrilldownFromDetection(cls, detection: Detection) -> Drilldown:
if len([f"${o.name}$" for o in detection.tags.observable if o.role[0] == "Victim"]) == 0 and detection.type != AnalyticsType.Hunting:
print("no victim!")
# print(detection.tags.observable)
# print(detection.file_path)
name_field = "View the detection results for " + ' and ' + ''.join([f"${o.name}$" for o in detection.tags.observable if o.type[0] == "Victim"])
search_field = f"{detection.search} | search " + ' '.join([f"o.name = ${o.name}$" for o in detection.tags.observable])
return cls(name=name_field, search=search_field)


def perform_search_substitutions(self, detection:Detection)->None:
if (self.search.count("%") % 2) or (self.search.count("$") % 2):
print("\n\nWarning - a non-even number of '%' or '$' characters were found in the\n"
f"drilldown search '{self.search}' for Detection {detection.file_path}.\n"
"If this was intentional, then please ignore this warning.\n")
self.search = self.search.replace(SEARCH_PLACEHOLDER, detection.search)



0 comments on commit 9ba9300

Please sign in to comment.