Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of updating offline caching and loading #276

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contentctl/actions/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def execute(self, input_dto: validate) -> DirectorOutputDto:

director_output_dto = DirectorOutputDto(
AtomicTest.getAtomicTestsFromArtRepo(
repo_path=input_dto.getAtomicRedTeamRepoPath(),
repo_path=input_dto.atomic_red_team_repo_path,
enabled=input_dto.enrichments,
),
AttackEnrichment.getAttackEnrichment(input_dto),
Expand Down
3 changes: 2 additions & 1 deletion contentctl/contentctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def init_func(config:test):


def validate_func(config:validate)->DirectorOutputDto:
config.ensureEnrichmentReposPresent()
validate = Validate()
return validate.execute(config)

Expand Down Expand Up @@ -213,7 +214,7 @@ def main():
raise Exception(f"Unknown command line type '{type(config).__name__}'")
except Exception as e:
if config is None:
print("There was a serious issue where the config file could not be created.\n"
print("The config file could not be created.\n"
"The entire stack trace is provided below (please include it if filing a bug report).\n")
traceback.print_exc()
elif config.verbose:
Expand Down
271 changes: 156 additions & 115 deletions contentctl/enrichments/attack_enrichment.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,23 @@

from __future__ import annotations
import csv
import os
import sys
import pathlib
from attackcti import attack_client
import logging
from pydantic import BaseModel, Field
from dataclasses import field
from typing import Annotated,Any
from contentctl.objects.mitre_attack_enrichment import MitreAttackEnrichment
from pydantic import BaseModel, Field, ConfigDict
from contentctl.objects.config import validate
from contentctl.objects.annotated_types import MITRE_ATTACK_ID_TYPE
logging.getLogger('taxii2client').setLevel(logging.CRITICAL)

from attackcti.models import Technique, Relationship, Group, GroupTechnique

class AttackEnrichment(BaseModel):
data: dict[str, MitreAttackEnrichment] = field(default_factory=dict)
data: dict[str, MitreAttackEnrichment] = Field(default_factory=dict)
use_enrichment:bool = True

@staticmethod
def getAttackEnrichment(config:validate)->AttackEnrichment:
enrichment = AttackEnrichment(use_enrichment=config.enrichments)
_ = enrichment.get_attack_lookup(str(config.path))
_ = enrichment.get_attack_lookup(config.mitre_cti_repo_path)
return enrichment

def getEnrichmentByMitreID(self, mitre_id:MITRE_ATTACK_ID_TYPE)->MitreAttackEnrichment:
Expand All @@ -34,120 +30,165 @@ def getEnrichmentByMitreID(self, mitre_id:MITRE_ATTACK_ID_TYPE)->MitreAttackEnri
else:
raise Exception(f"Error, Unable to find Mitre Enrichment for MitreID {mitre_id}")

def addMitreIDViaGroupNames(self, technique:dict, tactics:list[str], groupNames:list[str])->None:
technique_id = technique['technique_id']
technique_obj = technique['technique']
tactics.sort()

if technique_id in self.data:
raise Exception(f"Error, trying to redefine MITRE ID '{technique_id}'")
self.data[technique_id] = MitreAttackEnrichment(mitre_attack_id=technique_id,
mitre_attack_technique=technique_obj,
mitre_attack_tactics=tactics,
mitre_attack_groups=groupNames,
mitre_attack_group_objects=[])

def addMitreIDViaGroupObjects(self, technique:dict, tactics:list[str], groupObjects:list[dict[str,Any]])->None:
technique_id = technique['technique_id']
technique_obj = technique['technique']
tactics.sort()

groupNames:list[str] = sorted([group['group'] for group in groupObjects])

if technique_id in self.data:
raise Exception(f"Error, trying to redefine MITRE ID '{technique_id}'")
self.data[technique_id] = MitreAttackEnrichment(mitre_attack_id=technique_id,
mitre_attack_technique=technique_obj,
mitre_attack_tactics=tactics,
mitre_attack_groups=groupNames,
mitre_attack_group_objects=groupObjects)


def get_attack_lookup(self, input_path: str, store_csv: bool = False, force_cached_or_offline: bool = False, skip_enrichment:bool = False) -> dict:
def get_attack_lookup(self, input_path: pathlib.Path) -> None:
if not self.use_enrichment:
return {}
print("Getting MITRE Attack Enrichment Data. This may take some time...")
attack_lookup = dict()
file_path = os.path.join(input_path, "app_template", "lookups", "mitre_enrichment.csv")

if skip_enrichment is True:
print("Skipping enrichment")
return attack_lookup
try:

if force_cached_or_offline is True:
raise(Exception("WARNING - Using cached MITRE Attack Enrichment. Attack Enrichment may be out of date. Only use this setting for offline environments and development purposes."))
print(f"\r{'Client'.rjust(23)}: [{0:3.0f}%]...", end="", flush=True)
lift = attack_client()
print(f"\r{'Client'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True)
return None

print(f"\r{'Techniques'.rjust(23)}: [{0.0:3.0f}%]...", end="", flush=True)
all_enterprise_techniques = lift.get_enterprise_techniques(stix_format=False)
print(f"Parsing MITRE Enrichment Data...", end="", flush=True)
try:
#First try to get the info from

print(f"\r{'Techniques'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True)
lift = attack_client(local_paths={
"enterprise":str(input_path/"enterprise-attack"),
"mobile":str(input_path/"ics-attack"),
"ics":str(input_path/"mobile-attack")
})


from stix2.v20.sdo import AttackPattern


all_techniques = lift.get_techniques_used_by_all_groups(stix_format=True)
techs:list[GroupTechnique] = []
for gt in all_techniques:
techs.append(GroupTechnique.model_validate(gt))

print(f"\r{'Relationships'.rjust(23)}: [{0.0:3.0f}%]...", end="", flush=True)
enterprise_relationships = lift.get_enterprise_relationships(stix_format=False)
print(f"\r{'Relationships'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True)

#Get all enterprise techniques and construct into objects
'''
techniques:list[AttackPattern | dict[str,Any]] = lift.get_enterprise_techniques(stix_format=False)

print(f"\r{'Groups'.rjust(23)}: [{0:3.0f}%]...", end="", flush=True)
enterprise_groups = lift.get_enterprise_groups(stix_format=False)
print(f"\r{'Groups'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True)
enterprise_techniques:list[MitreEnterpriseTechnique] = []

for t in techniques:
#Field aliases have been set for from attackcti.models.Technique that we must fix
# t.update(
# {
# "name":t['technique']
# }
# )
Technique.model_validate(t)
enterprise_techniques.append(MitreEnterpriseTechnique.model_validate(t))

for index, technique in enumerate(all_enterprise_techniques):
progress_percent = ((index+1)/len(all_enterprise_techniques)) * 100
if (sys.stdout.isatty() and sys.stdin.isatty() and sys.stderr.isatty()):
print(f"\r\t{'MITRE Technique Progress'.rjust(23)}: [{progress_percent:3.0f}%]...", end="", flush=True)
apt_groups:list[dict[str,Any]] = []
for relationship in enterprise_relationships:
if (relationship['target_object'] == technique['id']) and relationship['source_object'].startswith('intrusion-set'):
for group in enterprise_groups:
if relationship['source_object'] == group['id']:
apt_groups.append(group)
#apt_groups.append(group['group'])

tactics = []
if ('tactic' in technique):
for tactic in technique['tactic']:
tactics.append(tactic.replace('-',' ').title())

self.addMitreIDViaGroupObjects(technique, tactics, apt_groups)
attack_lookup[technique['technique_id']] = {'technique': technique['technique'], 'tactics': tactics, 'groups': apt_groups}

if store_csv:
f = open(file_path, 'w')
writer = csv.writer(f)
writer.writerow(['mitre_id', 'technique', 'tactics' ,'groups'])
for key in attack_lookup.keys():
if len(attack_lookup[key]['groups']) == 0:
groups = 'no'
else:
groups = '|'.join(attack_lookup[key]['groups'])

writer.writerow([
key,
attack_lookup[key]['technique'],
'|'.join(attack_lookup[key]['tactics']),
groups
])

f.close()

except Exception as err:
print(f'\nError: {str(err)}')
print('Use local copy app_template/lookups/mitre_enrichment.csv')
with open(file_path, mode='r') as inp:
reader = csv.reader(inp)
attack_lookup = {rows[0]:{'technique': rows[1], 'tactics': rows[2].split('|'), 'groups': rows[3].split('|')} for rows in reader}
attack_lookup.pop('mitre_id')
for key in attack_lookup.keys():
technique_input = {'technique_id': key , 'technique': attack_lookup[key]['technique'] }
tactics_input = attack_lookup[key]['tactics']
groups_input = attack_lookup[key]['groups']
self.addMitreIDViaGroupNames(technique=technique_input, tactics=tactics_input, groups=groups_input)
#Get all relationships and parse into objects
relationships_dict = lift.get_enterprise_relationships(stix_format=False)
enterprise_relationships:list[MitreEnterpriseRelationship] = []
for t in relationships_dict:
#Field aliases have been set for from attackcti.models.Relationship that we must fix
t.update(
{
"relationship_type":t['relationship'],
"source_ref":t['source_object'],
"target_ref":t['target_object']
}
)
Relationship.model_validate(t)
enterprise_relationships.append(MitreEnterpriseRelationship.model_validate(t))
# We only care about intrusion-set relationships
enterprise_relationships = list(filter(lambda r: r.source_object.startswith('intrusion-set'), enterprise_relationships))

# Get all enterprise groups and parse into objects
groups_dict = lift.get_enterprise_groups(stix_format=False)
enterprise_groups: list[MitreAttackGroup] = []
for t in groups_dict:
#Field aliases have been set for from attackcti.models.Group that we must fix
t.update(
{
"name":t['group']
}
)
Group.model_validate(t)
enterprise_groups.append(MitreAttackGroup.model_validate(t))

# Using the relationships, assign the appropriate groups to each technique
for tech in enterprise_techniques:
tech.updateGroups(enterprise_relationships,enterprise_groups)
'''


except Exception as err:
print("ERROR")
raise Exception(f"Error getting MITRE Enrichment: {str(err)}")

print("done!")

self.data = MitreAttackEnrichment.constructFromEnrichment(techs)



class MitreAttackEnrichment(BaseModel):
ConfigDict(use_enum_values=True)
mitre_attack_id: MITRE_ATTACK_ID_TYPE = Field(...)
mitre_attack_technique: str = Field(...)
mitre_attack_tactics: list[str] = Field(...)
mitre_attack_groups: list[str] = Field(...)
#Exclude this field from serialization - it is very large and not useful in JSON objects
mitre_attack_group_objects: list[GroupTechnique] = Field(..., exclude=True)
def __hash__(self) -> int:
return id(self)

@classmethod
def constructFromEnrichment(cls, techniques_used_by_groups: list[GroupTechnique])->dict[MITRE_ATTACK_ID_TYPE, MitreAttackEnrichment]:
mapping: dict[MITRE_ATTACK_ID_TYPE, MitreAttackEnrichment] = {}
technique_ids = set([technique.technique_id for technique in techniques_used_by_groups])

for technique_id in sorted(technique_ids):
all_groups = [group_technique for group_technique in techniques_used_by_groups if group_technique.technique_id == technique_id]
# convert groups to proper pydantic groups due to field aliases
mapping[technique_id] = cls(
mitre_attack_id=technique_id,
mitre_attack_technique=all_groups[0].technique,
mitre_attack_tactics=all_groups[0].tactic,
mitre_attack_groups=[group.group for group in all_groups],
mitre_attack_group_objects=all_groups)

return mapping

'''
from pydantic import BaseModel

class GrandParent(BaseModel):
pass

class Parent(GrandParent):
name: str

class Child(Parent):
age: int

#Child has type child, as expected
child = Child(name="Tom", age=7)
print(type(child))
# <class 'Child'>

#Parent actually has type Child!
p = Parent.model_validate(child)
print(type(p))
# <class 'Child'>

# Grandparent also has type Child!
g = GrandParent.model_validate(child)
print(type(g))
# <class 'Child'>

pp = Parent(**child.dict())
type(pp)
#<class 'Parent'>



class Kidgparent():
pass
height=10

class Kidparent(Kidgparent):
def __init__(self, name:str):
self.name = name
class Kid(Kidparent):
def __init__(self, name:str, age:int):
self.age=age
super().__init__(name=name)

print("Done!")
return attack_lookup
kid = Kid(name="name", age=10)
'''
24 changes: 1 addition & 23 deletions contentctl/objects/atomic.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def getAtomicByAtomicGuid(cls, guid: UUID4, all_atomics:list[AtomicTest] | None)
@classmethod
def parseArtRepo(cls, repo_path:pathlib.Path)->List[AtomicFile]:
if not repo_path.is_dir():
print(f"WARNING: Atomic Red Team repo does NOT exist at {repo_path.absolute()}. You can check it out with:\n * git clone --single-branch https://github.com/redcanaryco/atomic-red-team. This will ONLY throw a validation error if you reference atomid_guids in your detection(s).")
print(f"WARNING: Atomic Red Team repo does NOT exist at {repo_path}. You can check it out with:\n * git clone --single-branch https://github.com/redcanaryco/atomic-red-team {repo_path}. This will ONLY throw a validation error if you reference atomid_guids in your detection(s).")
return []
atomics_path = repo_path/"atomics"
if not atomics_path.is_dir():
Expand Down Expand Up @@ -181,28 +181,6 @@ class AtomicFile(BaseModel):
display_name: str
atomic_tests: List[AtomicTest]




# ATOMICS_PATH = pathlib.Path("./atomics")
# atomic_objects = []
# atomic_simulations = []
# for obj_path in ATOMICS_PATH.glob("**/T*.yaml"):
# try:
# with open(obj_path, 'r', encoding="utf-8") as obj_handle:
# obj_data = yaml.load(obj_handle, Loader=yaml.CSafeLoader)
# atomic_obj = AtomicFile.model_validate(obj_data)
# except Exception as e:
# print(f"Error parsing object at path {obj_path}: {str(e)}")
# print(f"We have successfully parsed {len(atomic_objects)}, however!")
# sys.exit(1)

# print(f"Successfully parsed {obj_path}!")
# atomic_objects.append(atomic_obj)
# atomic_simulations += atomic_obj.atomic_tests

# print(f"Successfully parsed all {len(atomic_objects)} files!")
# print(f"Successfully parsed all {len(atomic_simulations)} simulations!")



Expand Down
Loading
Loading