Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

Commit

Permalink
Process indicators with STIX-Shifter
Browse files Browse the repository at this point in the history
  • Loading branch information
0snap committed May 4, 2021
1 parent 196a43f commit 6baff90
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 12 deletions.
18 changes: 18 additions & 0 deletions apps/stix-shifter/config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,21 @@ logging:

threatbus: localhost:13370
snapshot: 30
modules:
# for details on a module's options, please see https://github.com/opencybersecurityalliance/stix-shifter/blob/master/OVERVIEW.md#how-to-use
splunk:
max-results: 100
connection:
host: localhost,
port: 8089,
selfSignedCert: false,
transmission:
auth:
username: admin
password: admin123
translation: {<Any required options specific to the particular data source>}
data_source:
type: identity
identity_class: events
name: Splunk
id: identity--629a6400-8817-4bcb-aee7-8c74fc57482c
138 changes: 126 additions & 12 deletions apps/stix-shifter/stix_shifter_threatbus/shifter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,20 @@
import coloredlogs
import confuse
import logging
import json
import sys
from stix2 import parse
from stix2 import parse, Indicator, Bundle
from stix_shifter.stix_translation import stix_translation
from stix_shifter.stix_transmission import stix_transmission
from threatbus.logger import setup as setup_logging_threatbus
from typing import Union
import warnings
import zmq

logger_name = "zmq-app-template"
# Ignore warnings about SSL configuration in the user configs
warnings.filterwarnings("ignore")

logger_name = "stix-shifter-threatbus"
logger = logging.getLogger(logger_name)
# list of all running async tasks of the bridge
async_tasks = []
Expand Down Expand Up @@ -47,13 +55,25 @@ def setup_logging_with_config(config: confuse.Subview):
"""
global logger
logger = setup_logging_threatbus(config, logger_name)
logging.getLogger("stix-shifter-utils").propagate = False


def validate_config(config: confuse.Subview):
assert config, "config must not be None"
config["threatbus"].get(str)
config["snapshot"].get(int)

for mod in config["modules"].get(dict):
config["modules"][mod].get(dict)
config["modules"][mod]["max_results"].get(int)
config["modules"][mod]["connection"].get(dict)
config["modules"][mod]["data_source"].get(dict)
config["modules"][mod]["data_source"]["type"].get(str)
config["modules"][mod]["data_source"]["name"].get(str)
config["modules"][mod]["data_source"]["id"].get(str)
config["modules"][mod]["transmission"].add({}) # default to empty config
config["modules"][mod]["translation"].add({}) # default to empty config


def cancel_async_tasks():
"""
Expand Down Expand Up @@ -163,7 +183,7 @@ async def heartbeat(endpoint: str, p2p_topic: str, interval: int = 5):
### --------------------------- The actual app logic ---------------------------


async def start(zmq_endpoint: str, snapshot: int):
async def start(zmq_endpoint: str, snapshot: int, modules_config: dict):
"""
Starts the template app.
@param zmq_endpoint The ZMQ management endpoint of Threat Bus ('host:port')
Expand Down Expand Up @@ -193,6 +213,14 @@ async def start(zmq_endpoint: str, snapshot: int):
p2p_topic = topic
atexit.register(unsubscribe, zmq_endpoint, topic)

# set task exception handler
loop = asyncio.get_event_loop()

def exception_handler(loop, context):
logger.error(f"Error in async task: {context}")

loop.set_exception_handler(exception_handler)

# Start a heartbeat task so we notice when the Threat Bus host goes away
async_tasks.append(
asyncio.create_task(heartbeat(zmq_endpoint, p2p_topic, interval=5))
Expand All @@ -203,7 +231,9 @@ async def start(zmq_endpoint: str, snapshot: int):
async_tasks.append(
asyncio.create_task(receive(pub_endpoint, p2p_topic, indicator_queue))
)
async_tasks.append(asyncio.create_task(do_something_with_intel(indicator_queue)))
async_tasks.append(
asyncio.create_task(process_indicators(indicator_queue, modules_config))
)

atexit.register(cancel_async_tasks)
return await asyncio.gather(*async_tasks)
Expand Down Expand Up @@ -244,20 +274,103 @@ async def receive(pub_endpoint: str, topic: str, indicator_queue: asyncio.Queue)
await asyncio.sleep(0.01) # Free event loop for other tasks


async def do_something_with_intel(indicator_queue: asyncio.Queue):
async def process_indicators(indicator_queue: asyncio.Queue, modules_config: dict):
"""
Does something with the received indicators.
Translates STIX-2 pattern and queries all configured modules via
STIX-Shifter.
@param indicator_queue The queue to put arriving IoCs into
"""
while True:
msg = await indicator_queue.get()
indicator = parse(msg, allow_custom=True)
logger.debug(f"Got indicator from Threat Bus: {indicator}")
# Do something with it, e.g., check its type and then start operating on
# the timestamps, the STIX pattern, ...
try:
indicator = parse(msg, allow_custom=True)
except Exception as e:
logger.error(
f"Error parsing indicator from Threat Bus. Expected STIX-2 Indicator: {msg}, {e}"
)
indicator_queue.task_done()
continue
logger.debug(
f"Converting indicator from Threat Bus to module-specific query: {indicator}"
)
for module, opts in modules_config.items():
asyncio.create_task(query_indicator(indicator, module, opts))
indicator_queue.task_done()


async def query_indicator(
indicator: Indicator, module: str, opts: dict
) -> Union[None, Bundle]:
"""
Translates an indicator into a module-specific query and executes it. E.g.,
if the module is `splunk`, the indicator's pattern is first translated into
a valid Splunk query and then executed via the Splunk REST API.
@param indicator The indicator to translate and query
@param module The module's name, e.g., `splunk`
@param opts The module configuration directly taken from the user-defined
configuration file `config.yaml` with which this app was started
"""
max_results = opts["max_results"]
connection_opts = opts["connection"]
transmission_opts = opts.get("transmission", {})
translation_opts = opts.get("translation", {})
data_source = opts["data_source"]

## Translate the pattern to a module-specific query.
translation = stix_translation.StixTranslation()
dsl = translation.translate(
module, "query", indicator, indicator.pattern, translation_opts
)
if not dsl.get("queries", None):
logger.error(
f"Failed to translate STIX-2 indicator with ID '{indicator.id}' to query for module '{module}': {dsl}"
)
return
logger.debug(f"Translated pattern to {module} query: {dsl}")

## Run the query against the configured endpoint for this module.
transmission = stix_transmission.StixTransmission(
module, connection_opts, transmission_opts
)
query_results = []
for query in dsl["queries"]:
search_result = transmission.query(query)
if not search_result["success"]:
logger.error(str(search_result))
continue

search_id = search_result["search_id"]

if transmission.is_async():
status = transmission.status(search_id)
if not status.get("success", None):
logger.error(f"Fetching query status failed for module '{module}'")
return
while status["progress"] < 100 and status["status"] == "RUNNING":
status = transmission.status(search_id)
await asyncio.sleep(0.05)
result = transmission.results(search_id, 0, max_results)
if result["success"]:
# Collect all results
query_results += result["data"]
else:
logger.error(f"Fetching results failed for module '{module}': {result}")

## Translate query_results to STIX.
if not query_results:
return

stix_results = translation.translate(
module,
"results",
json.dumps(data_source),
json.dumps(query_results),
translation_opts,
)
# TODO: parse output and report back sightings
logger.debug(f"STIX Results: {stix_results}")


def main():
parser = argparse.ArgumentParser()
parser.add_argument("--config", "-c", help="Path to a configuration file")
Expand All @@ -271,7 +384,7 @@ def main():
# APPNAMEDIR env variable to overwrite search paths, i.e., in systemd
# https://confit.readthedocs.io/en/latest/#search-paths
# https://github.com/beetbox/confuse/blob/v1.4.0/confuse/core.py#L555
config = confuse.Configuration("zmq_app_template")
config = confuse.Configuration("stix_shifter")
config.set_args(args)
if args.config:
config.set_file(args.config)
Expand All @@ -292,10 +405,11 @@ def main():
start(
config["threatbus"].get(),
config["snapshot"].get(),
config["modules"].get(dict),
)
)
except asyncio.CancelledError:
logger.info("Restarting template app ...")
logger.info("Restarting STIX-Shifter Threat Bus app ...")


if __name__ == "__main__":
Expand Down

0 comments on commit 6baff90

Please sign in to comment.