From 6baff90c07228e8ad2aa5470eb7a41bb1dbe7dcc Mon Sep 17 00:00:00 2001 From: Felix Ortmann Date: Tue, 4 May 2021 17:14:45 +0200 Subject: [PATCH] Process indicators with STIX-Shifter --- apps/stix-shifter/config.yaml.example | 18 +++ .../stix_shifter_threatbus/shifter.py | 138 ++++++++++++++++-- 2 files changed, 144 insertions(+), 12 deletions(-) diff --git a/apps/stix-shifter/config.yaml.example b/apps/stix-shifter/config.yaml.example index a769be1f..20a0db3b 100644 --- a/apps/stix-shifter/config.yaml.example +++ b/apps/stix-shifter/config.yaml.example @@ -7,3 +7,21 @@ logging: threatbus: localhost:13370 snapshot: 30 +modules: + # for details on a module's options, please see https://github.com/opencybersecurityalliance/stix-shifter/blob/master/OVERVIEW.md#how-to-use + splunk: + max-results: 100 + connection: + host: localhost, + port: 8089, + selfSignedCert: false, + transmission: + auth: + username: admin + password: admin123 + translation: {} + data_source: + type: identity + identity_class: events + name: Splunk + id: identity--629a6400-8817-4bcb-aee7-8c74fc57482c diff --git a/apps/stix-shifter/stix_shifter_threatbus/shifter.py b/apps/stix-shifter/stix_shifter_threatbus/shifter.py index da473367..2912ebae 100755 --- a/apps/stix-shifter/stix_shifter_threatbus/shifter.py +++ b/apps/stix-shifter/stix_shifter_threatbus/shifter.py @@ -6,12 +6,20 @@ import coloredlogs import confuse import logging +import json import sys -from stix2 import parse +from stix2 import parse, Indicator, Bundle +from stix_shifter.stix_translation import stix_translation +from stix_shifter.stix_transmission import stix_transmission from threatbus.logger import setup as setup_logging_threatbus +from typing import Union +import warnings import zmq -logger_name = "zmq-app-template" +# Ignore warnings about SSL configuration in the user configs +warnings.filterwarnings("ignore") + +logger_name = "stix-shifter-threatbus" logger = logging.getLogger(logger_name) # list of all running async tasks of the bridge async_tasks = [] @@ -47,6 +55,7 @@ def setup_logging_with_config(config: confuse.Subview): """ global logger logger = setup_logging_threatbus(config, logger_name) + logging.getLogger("stix-shifter-utils").propagate = False def validate_config(config: confuse.Subview): @@ -54,6 +63,17 @@ def validate_config(config: confuse.Subview): config["threatbus"].get(str) config["snapshot"].get(int) + for mod in config["modules"].get(dict): + config["modules"][mod].get(dict) + config["modules"][mod]["max_results"].get(int) + config["modules"][mod]["connection"].get(dict) + config["modules"][mod]["data_source"].get(dict) + config["modules"][mod]["data_source"]["type"].get(str) + config["modules"][mod]["data_source"]["name"].get(str) + config["modules"][mod]["data_source"]["id"].get(str) + config["modules"][mod]["transmission"].add({}) # default to empty config + config["modules"][mod]["translation"].add({}) # default to empty config + def cancel_async_tasks(): """ @@ -163,7 +183,7 @@ async def heartbeat(endpoint: str, p2p_topic: str, interval: int = 5): ### --------------------------- The actual app logic --------------------------- -async def start(zmq_endpoint: str, snapshot: int): +async def start(zmq_endpoint: str, snapshot: int, modules_config: dict): """ Starts the template app. @param zmq_endpoint The ZMQ management endpoint of Threat Bus ('host:port') @@ -193,6 +213,14 @@ async def start(zmq_endpoint: str, snapshot: int): p2p_topic = topic atexit.register(unsubscribe, zmq_endpoint, topic) + # set task exception handler + loop = asyncio.get_event_loop() + + def exception_handler(loop, context): + logger.error(f"Error in async task: {context}") + + loop.set_exception_handler(exception_handler) + # Start a heartbeat task so we notice when the Threat Bus host goes away async_tasks.append( asyncio.create_task(heartbeat(zmq_endpoint, p2p_topic, interval=5)) @@ -203,7 +231,9 @@ async def start(zmq_endpoint: str, snapshot: int): async_tasks.append( asyncio.create_task(receive(pub_endpoint, p2p_topic, indicator_queue)) ) - async_tasks.append(asyncio.create_task(do_something_with_intel(indicator_queue))) + async_tasks.append( + asyncio.create_task(process_indicators(indicator_queue, modules_config)) + ) atexit.register(cancel_async_tasks) return await asyncio.gather(*async_tasks) @@ -244,20 +274,103 @@ async def receive(pub_endpoint: str, topic: str, indicator_queue: asyncio.Queue) await asyncio.sleep(0.01) # Free event loop for other tasks -async def do_something_with_intel(indicator_queue: asyncio.Queue): +async def process_indicators(indicator_queue: asyncio.Queue, modules_config: dict): """ - Does something with the received indicators. + Translates STIX-2 pattern and queries all configured modules via + STIX-Shifter. @param indicator_queue The queue to put arriving IoCs into """ while True: msg = await indicator_queue.get() - indicator = parse(msg, allow_custom=True) - logger.debug(f"Got indicator from Threat Bus: {indicator}") - # Do something with it, e.g., check its type and then start operating on - # the timestamps, the STIX pattern, ... + try: + indicator = parse(msg, allow_custom=True) + except Exception as e: + logger.error( + f"Error parsing indicator from Threat Bus. Expected STIX-2 Indicator: {msg}, {e}" + ) + indicator_queue.task_done() + continue + logger.debug( + f"Converting indicator from Threat Bus to module-specific query: {indicator}" + ) + for module, opts in modules_config.items(): + asyncio.create_task(query_indicator(indicator, module, opts)) indicator_queue.task_done() +async def query_indicator( + indicator: Indicator, module: str, opts: dict +) -> Union[None, Bundle]: + """ + Translates an indicator into a module-specific query and executes it. E.g., + if the module is `splunk`, the indicator's pattern is first translated into + a valid Splunk query and then executed via the Splunk REST API. + @param indicator The indicator to translate and query + @param module The module's name, e.g., `splunk` + @param opts The module configuration directly taken from the user-defined + configuration file `config.yaml` with which this app was started + """ + max_results = opts["max_results"] + connection_opts = opts["connection"] + transmission_opts = opts.get("transmission", {}) + translation_opts = opts.get("translation", {}) + data_source = opts["data_source"] + + ## Translate the pattern to a module-specific query. + translation = stix_translation.StixTranslation() + dsl = translation.translate( + module, "query", indicator, indicator.pattern, translation_opts + ) + if not dsl.get("queries", None): + logger.error( + f"Failed to translate STIX-2 indicator with ID '{indicator.id}' to query for module '{module}': {dsl}" + ) + return + logger.debug(f"Translated pattern to {module} query: {dsl}") + + ## Run the query against the configured endpoint for this module. + transmission = stix_transmission.StixTransmission( + module, connection_opts, transmission_opts + ) + query_results = [] + for query in dsl["queries"]: + search_result = transmission.query(query) + if not search_result["success"]: + logger.error(str(search_result)) + continue + + search_id = search_result["search_id"] + + if transmission.is_async(): + status = transmission.status(search_id) + if not status.get("success", None): + logger.error(f"Fetching query status failed for module '{module}'") + return + while status["progress"] < 100 and status["status"] == "RUNNING": + status = transmission.status(search_id) + await asyncio.sleep(0.05) + result = transmission.results(search_id, 0, max_results) + if result["success"]: + # Collect all results + query_results += result["data"] + else: + logger.error(f"Fetching results failed for module '{module}': {result}") + + ## Translate query_results to STIX. + if not query_results: + return + + stix_results = translation.translate( + module, + "results", + json.dumps(data_source), + json.dumps(query_results), + translation_opts, + ) + # TODO: parse output and report back sightings + logger.debug(f"STIX Results: {stix_results}") + + def main(): parser = argparse.ArgumentParser() parser.add_argument("--config", "-c", help="Path to a configuration file") @@ -271,7 +384,7 @@ def main(): # APPNAMEDIR env variable to overwrite search paths, i.e., in systemd # https://confit.readthedocs.io/en/latest/#search-paths # https://github.com/beetbox/confuse/blob/v1.4.0/confuse/core.py#L555 - config = confuse.Configuration("zmq_app_template") + config = confuse.Configuration("stix_shifter") config.set_args(args) if args.config: config.set_file(args.config) @@ -292,10 +405,11 @@ def main(): start( config["threatbus"].get(), config["snapshot"].get(), + config["modules"].get(dict), ) ) except asyncio.CancelledError: - logger.info("Restarting template app ...") + logger.info("Restarting STIX-Shifter Threat Bus app ...") if __name__ == "__main__":