diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f8c0528..d8d81be6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,11 @@ Every entry has a category for which we use the following visual abbreviations: Threat Bus. Starting without it will print a helpful error message. [#119](https://github.com/tenzir/threatbus/pull/119) +- 🎁 We now provide a simple asyncio + [template](https://github.com/tenzir/threatbus/tree/master/apps/zmq-app-template) + for writing applications that connect to Threat Bus via ZeroMQ. + [#118](https://github.com/tenzir/threatbus/pull/118) + - ⚠️ The `threatbus-zeek` plugin now uses the timestamp of Zeek intel matches to set the `last_seen` property of resulting STIX-2 Sightings, instead of setting the `created` timestamp. The `created` timestamp now always refers to the diff --git a/Makefile b/Makefile index 5728b262..b97a1b78 100644 --- a/Makefile +++ b/Makefile @@ -25,6 +25,7 @@ unit-tests: $(MAKE) -C plugins/apps/threatbus_zeek unit-tests $(MAKE) -C plugins/apps/threatbus_cif3 unit-tests $(MAKE) -C apps/vast unit-tests + $(MAKE) -C apps/stix-shifter unit-tests .PHONY: integration-tests integration-tests: @@ -50,6 +51,7 @@ clean: -$(MAKE) -C plugins/backbones/threatbus_inmem clean -$(MAKE) -C plugins/backbones/threatbus_rabbitmq clean -$(MAKE) -C apps/vast clean + -$(MAKE) -C apps/stix-shifter clean .PHONY: build build: @@ -61,6 +63,7 @@ build: $(MAKE) -C plugins/backbones/threatbus_inmem build $(MAKE) -C plugins/backbones/threatbus_rabbitmq build $(MAKE) -C apps/vast build + $(MAKE) -C apps/stix-shifter build .PHONY: dist dist: @@ -72,6 +75,7 @@ dist: $(MAKE) -C plugins/backbones/threatbus_inmem dist $(MAKE) -C plugins/backbones/threatbus_rabbitmq dist $(MAKE) -C apps/vast dist + $(MAKE) -C apps/stix-shifter dist .PHONY: install install: @@ -83,6 +87,7 @@ install: $(MAKE) -C plugins/backbones/threatbus_inmem install $(MAKE) -C plugins/backbones/threatbus_rabbitmq install $(MAKE) -C apps/vast install + $(MAKE) -C apps/stix-shifter install .PHONY: dev-mode dev-mode: @@ -92,5 +97,6 @@ dev-mode: $(MAKE) -C plugins/backbones/threatbus_rabbitmq dev-mode $(MAKE) -C plugins/apps/threatbus_misp dev-mode $(MAKE) -C plugins/apps/threatbus_zeek dev-mode - $(MAKE) -C apps/vast dev-mode $(MAKE) -C plugins/apps/threatbus_cif3 dev-mode + $(MAKE) -C apps/vast dev-mode + $(MAKE) -C apps/stix-shifter dev-mode diff --git a/apps/stix-shifter/CHANGELOG.md b/apps/stix-shifter/CHANGELOG.md new file mode 100644 index 00000000..017701da --- /dev/null +++ b/apps/stix-shifter/CHANGELOG.md @@ -0,0 +1,24 @@ +# Changelog + +This changelog documents all notable user-facing changes of +`stix-shifter-threatbus`. + +Every entry has a category for which we use the following visual abbreviations: + +- 🎁 Features +- 🧬 Experimental Features +- ⚠️ Changes +- ⚡️ Breaking Changes +- 🐞 Bug Fixes + +## Unreleased + +- 🎁 `stix-shifter-threatbus` has come to life. This stand-alone application + connects to Threat Bus via ZeroMQ and bridges the gap between Threat Bus and + commercial security tools, like + [IBM QRadar](https://www.ibm.com/security/security-intelligence/qradar) or + [Splunk](https://www.splunk.com/). `stix-shifter-threatbus` uses + [STIX-Shifter](https://github.com/opencybersecurityalliance/stix-shifter) to + first translate STIX-2 Indicators to native queries for commercial tools and + then execute these queries to log the results. + [#118](https://github.com/tenzir/threatbus/pull/118) diff --git a/apps/stix-shifter/Makefile b/apps/stix-shifter/Makefile new file mode 100644 index 00000000..304558c1 --- /dev/null +++ b/apps/stix-shifter/Makefile @@ -0,0 +1,36 @@ +colon := : +$(colon) := : + +.PHONY: all +all: format build dist test + +.PHONY: format +format: + python -m black . + +.PHONY: test +test: unit-tests + +.PHONY: unit-tests +unit-tests: + python -m unittest discover . + +.PHONY: clean +clean: + ${RM} -r __pycache__ *egg-info build dist + +.PHONY: build +build: + python setup.py build + +.PHONY: dist +dist: + python setup.py sdist bdist_wheel + +.PHONY: install +install: + pip install . + +.PHONY: dev-mode +dev-mode: + pip install --editable . diff --git a/apps/stix-shifter/README.md b/apps/stix-shifter/README.md new file mode 100644 index 00000000..ae4b7d0b --- /dev/null +++ b/apps/stix-shifter/README.md @@ -0,0 +1,84 @@ +STIX-Shifter Threat Bus +======================= + +This app bridges the gap between Threat Bus and various security tools by +leveraging +[STIX-Shifter](https://github.com/opencybersecurityalliance/stix-shifter). + +STIX-Shifter is a tool and library to transform STIX patterns into native +queries for a variety of (mostly commercial) security tools, like +[IBM QRadar](https://www.ibm.com/security/security-intelligence/qradar) or +[Splunk](https://www.splunk.com/). This app connects STIX-Shifter with Threat +Bus and provides a simple way to communicate with the commercial tools of your +choice via Threat Bus. + +## How It Works + +The `stix-shifter-threatbus` app uses ZeroMQ to connect with Threat Bus. To +connect via ZeroMQ, users must first install and configure the +[`threatbus-zmq-app`](https://pypi.org/project/threatbus-zmq-app/) plugin on +their Threat Bus host. + +This app functions as middleman between Threat Bus and security tools supported +by STIX-Shifter. It subscribes to indicator updates from the bus and uses +STIX-Shifter to actively translate STIX-2 intelligence to native queries. +The app then executes these queries via STIX-Shifter. [Result processing +is yet to be implemented.] + +## Quick Start + +You can configure the app via a YAML configuration file. See +`config.yaml.example` for an example config file. + +Install `stix-shifter-threatbus` in a virtualenv and start it by passing a +config file: + +```sh +python -m venv venv +source venv/bin/activate +make dev-mode +stix-shifter-threatbus -c config.yaml +``` + +## Configuration + +Apart from the logging section, which is self-explanatory, users need to +configure the `threatbus` endpoint of the ZerMQ-App plugin and an optional +`snapshot` of historic threat intel data they want to fetch. + +Additionally, users must configure each STIX-Shifter module individually to use +it with this app. You also must install the corresponding modules according to +your configuration. For example, if you configure a key `splunk` in the +`modules` section, you must install the `stix-shifter-modules-splunk`. Otherwise +the app will throw an error. See below for an example: + +``` +threatbus: localhost:13370 # connect with Threat Bus via this endpoint +snapshot: 300 # request 300 days of historic indicators +modules: + # for details on a module's options, please see https://github.com/opencybersecurityalliance/stix-shifter/blob/master/OVERVIEW.md#how-to-use + # to use the key `splunk` you must install `stix-shifter-modules-splunk` + # same goes for any other key, e.g., `elastic`, `qradar`, etc... + splunk: + max_results: 100 # limit the number of events queried by STIX-Shifter + # https://github.com/opencybersecurityalliance/stix-shifter/blob/master/OVERVIEW.md#connection + connection: + host: localhost + port: 8089 # Management port + selfSignedCert: false + # https://github.com/opencybersecurityalliance/stix-shifter/blob/master/OVERVIEW.md#configuration + transmission: + auth: + username: admin + password: admin123 + # https://github.com/opencybersecurityalliance/stix-shifter/blob/master/OVERVIEW.md#translate + translation: # {} + # The data_source is a STIX-2 DataSource (e.g., an `identity`) and is used + # to create a STIX bundle with the queried results. You configure it here + # and only once for this module. + data_source: + type: identity + identity_class: events + name: Splunk + id: identity--629a6400-8817-4bcb-aee7-8c74fc57482c +``` diff --git a/apps/stix-shifter/config.yaml.example b/apps/stix-shifter/config.yaml.example new file mode 100644 index 00000000..00ed8cff --- /dev/null +++ b/apps/stix-shifter/config.yaml.example @@ -0,0 +1,35 @@ +logging: + console: true + console_verbosity: DEBUG + file: true + file_verbosity: DEBUG + filename: stix-shifter.log + +threatbus: localhost:13370 +snapshot: 30 +modules: + # for details on a module's options, please see https://github.com/opencybersecurityalliance/stix-shifter/blob/master/OVERVIEW.md#how-to-use + # to use the key `splunk` you must install `stix-shifter-modules-splunk` + # same goes for any other key, e.g., `elastic`, `qradar`, etc... + splunk: + max_results: 100 # limit the number of events queried by STIX-Shifter + # https://github.com/opencybersecurityalliance/stix-shifter/blob/master/OVERVIEW.md#connection + connection: + host: localhost + port: 8089 # Management port + selfSignedCert: false + # https://github.com/opencybersecurityalliance/stix-shifter/blob/master/OVERVIEW.md#configuration + transmission: + auth: + username: admin + password: admin123 + # https://github.com/opencybersecurityalliance/stix-shifter/blob/master/OVERVIEW.md#translate + translation: # {} + # The data_source is a STIX-2 DataSource (e.g., an `identity`) and is used + # to create a STIX bundle with the queried results. You configure it here + # and only once for this module. + data_source: + type: identity + identity_class: events + name: Splunk + id: identity--629a6400-8817-4bcb-aee7-8c74fc57482c diff --git a/apps/stix-shifter/setup.py b/apps/stix-shifter/setup.py new file mode 100644 index 00000000..ce4c30f2 --- /dev/null +++ b/apps/stix-shifter/setup.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from setuptools import setup + +with open("README.md", "r") as fh: + long_description = fh.read() + +setup( + author="Tenzir", + author_email="engineering@tenzir.com", + classifiers=[ + # https://pypi.org/classifiers/ + "Development Status :: 3 - Alpha", + "License :: OSI Approved :: BSD License", + "Operating System :: MacOS :: MacOS X", + "Operating System :: POSIX :: Linux", + "Topic :: Scientific/Engineering :: Information Analysis", + "Topic :: Scientific/Engineering :: Interface Engine/Protocol Translator", + "Topic :: Security", + "Topic :: Software Development :: Object Brokering", + "Topic :: System :: Distributed Computing", + ], + description="Bridges the gap between Threat Bus and STIX-Shifter", + entry_points={ + "console_scripts": [ + "stix-shifter-threatbus=stix_shifter_threatbus.shifter:main" + ] + }, + include_package_data=True, + install_requires=[ + "black >= 19.10b", + "coloredlogs >= 14.0", + "confuse", + "pyzmq >= 19", + "stix2 >= 2.1", + "stix-shifter >= 3.4.2", + "stix-shifter-utils >= 3.4.2", + "threatbus >= 2021.4.29", + ], + keywords=[ + "open source", + "threatbus", + "Threat Bus", + "threat intelligence", + "TI", + "TI dissemination", + ], + license="BSD 3-clause", + long_description=long_description, + long_description_content_type="text/markdown", + name="stix-shifter-threatbus", + packages=["stix_shifter_threatbus"], + python_requires=">=3.7", + setup_requires=["setuptools", "wheel"], + url="https://github.com/tenzir/threatbus", + version="2021.05.27", +) diff --git a/apps/stix-shifter/stix_shifter_threatbus/__init__.py b/apps/stix-shifter/stix_shifter_threatbus/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/apps/stix-shifter/stix_shifter_threatbus/shifter.py b/apps/stix-shifter/stix_shifter_threatbus/shifter.py new file mode 100755 index 00000000..f8b4d2ca --- /dev/null +++ b/apps/stix-shifter/stix_shifter_threatbus/shifter.py @@ -0,0 +1,438 @@ +#!/usr/bin/env python3 + +import argparse +import asyncio +import atexit +import coloredlogs +import confuse +import json +import logging +import signal +from stix2 import parse, Indicator, Bundle +from stix_shifter.stix_translation import stix_translation +from stix_shifter.stix_transmission import stix_transmission +import sys +from threatbus.logger import setup as setup_logging_threatbus +from typing import Union +import warnings +import zmq + +# Ignore warnings about SSL configuration in the user configs. +warnings.filterwarnings("ignore") + +logger_name = "stix-shifter-threatbus" +logger = logging.getLogger(logger_name) +# List of all running async tasks of the bridge. +async_tasks = [] +# The p2p topic sent back by Threat Bus upon successful subscription. +p2p_topic = None +# Boolean flag indicating that the user has issued a SIGNAL (e.g., SIGTERM). +user_exit = False + +### --------------------------- Application helpers --------------------------- + + +def setup_logging_with_level(level: str): + """ + Sets up a the global logger for console logging with the given loglevel. + @param level The loglevel to use, e.g., "DEBUG" + """ + global logger + log_level = logging.getLevelName(level.upper()) + + fmt = "%(asctime)s %(levelname)-8s %(message)s" + colored_formatter = coloredlogs.ColoredFormatter(fmt) + + handler = logging.StreamHandler() + handler.setLevel(log_level) + if logger.level > log_level or logger.level == 0: + logger.setLevel(log_level) + handler.setFormatter(colored_formatter) + logger.addHandler(handler) + + +def setup_logging_with_config(config: confuse.Subview): + """ + Sets up the global logger as configured in the `config` object. + @param config The user-defined logging configuration + """ + global logger + logger = setup_logging_threatbus(config, logger_name) + logging.getLogger("stix-shifter-utils").propagate = False + + +def validate_config(config: confuse.Subview): + assert config, "config must not be None" + config["threatbus"].get(str) + config["snapshot"].get(int) + + for mod in config["modules"].get(dict): + config["modules"][mod].get(dict) + config["modules"][mod]["max_results"].get(int) + config["modules"][mod]["connection"].get(dict) + config["modules"][mod]["data_source"].get(dict) + config["modules"][mod]["data_source"]["type"].get(str) + config["modules"][mod]["data_source"]["name"].get(str) + config["modules"][mod]["data_source"]["id"].get(str) + config["modules"][mod]["transmission"].add({}) # default to empty config + config["modules"][mod]["translation"].add({}) # default to empty config + + +async def cancel_async_tasks(): + """ + Cancels all async tasks. + """ + global async_tasks + for task in async_tasks: + if task is not asyncio.current_task(): + task.cancel() + del task + async_tasks = [] + return await asyncio.gather(*async_tasks) + + +async def stop_signal(): + """ + Implements Python's asyncio eventloop signal handler + https://docs.python.org/3/library/asyncio-eventloop.html + Cancels all running tasks and exits the app. + """ + global user_exit + user_exit = True + await cancel_async_tasks() + + +### --------------- ZeroMQ communication / management functions --------------- + + +def send_manage_message(endpoint: str, action: dict, timeout: int = 5): + """ + Sends a 'management' message, following the threatbus-zmq-app protocol to + either subscribe or unsubscribe this application to/from Threat Bus. + @param endpoint A host:port string to connect to via ZeroMQ + @param action The message to send as JSON + @param timeout The period after which the connection attempt is aborted + """ + context = zmq.Context() + socket = context.socket(zmq.REQ) + socket.setsockopt(zmq.LINGER, 0) + socket.connect(f"tcp://{endpoint}") + socket.send_json(action) + poller = zmq.Poller() + poller.register(socket, zmq.POLLIN) + + reply = None + if poller.poll(timeout * 1000): + reply = socket.recv_json() + socket.close() + context.term() + return reply + + +def reply_is_success(reply: dict): + """ + Predicate to check if `reply` is a dict and contains the key-value pair + "status" = "success" + @param reply A python dict + @return True if the dict contains "status" = "success" + """ + return ( + reply + and type(reply) is dict + and reply.get("status", None) + and reply["status"] == "success" + ) + + +def subscribe(endpoint: str, topic: str, snapshot: int, timeout: int = 5): + """ + Subscribes this app to Threat Bus for the given topic. Requests an optional + snapshot of historical indicators. + @param endpoint The ZMQ management endpoint of Threat Bus ('host:port') + @param topic The topic to subscribe to + @param snapshot An integer value to request n days of historical IoC items + @param timeout The period after which the connection attempt is aborted + """ + global logger + logger.info(f"Subscribing to topic '{topic}'...") + action = {"action": "subscribe", "topic": topic, "snapshot": snapshot} + return send_manage_message(endpoint, action, timeout) + + +def unsubscribe(endpoint: str, topic: str, timeout: int = 5): + """ + Unsubscribes this app from Threat Bus for the given topic. + @param endpoint The ZMQ management endpoint of Threat Bus + @param topic The topic to unsubscribe from + @param timeout The period after which the connection attempt is aborted + """ + global logger + logger.info(f"Unsubscribing from topic '{topic}' ...") + action = {"action": "unsubscribe", "topic": topic} + reply = send_manage_message(endpoint, action, timeout) + if not reply_is_success(reply): + logger.warning("Unsubscription failed") + return + logger.info("Unsubscription successful") + + +async def heartbeat(endpoint: str, p2p_topic: str, interval: int = 5): + """ + Sends heartbeats to Threat Bus periodically to check if the given p2p_topic + is still valid at the Threat Bus host. Cancels all async tasks of this app + when the heartbeat fails and stops the heartbeat. + @param endpoint The ZMQ management endpoint of Threat Bus + @param p2p_topic The topic string to include in the heartbeat + @param timeout The period after which the connection attempt is aborted + """ + global logger + action = {"action": "heartbeat", "topic": p2p_topic} + while True: + reply = send_manage_message(endpoint, action, interval) + if not reply_is_success(reply): + logger.error("Subscription with Threat Bus host became invalid") + return await cancel_async_tasks() + await asyncio.sleep(interval) + + +### --------------------------- The actual app logic --------------------------- + + +async def start(zmq_endpoint: str, snapshot: int, modules_config: dict): + """ + Starts the STIX-Shifter Threat Bus app. + @param zmq_endpoint The ZMQ management endpoint of Threat Bus ('host:port') + @param snapshot An integer value to request n days of historical IoC items + @param modules_config User-provided configuration for STIX-Shifter modules + """ + global logger, async_tasks, p2p_topic + # needs to be created inside the same eventloop where it is used + logger.debug(f"Calling Threat Bus management endpoint {zmq_endpoint}") + reply = subscribe(zmq_endpoint, "stix2/indicator", snapshot) + if not reply_is_success(reply): + logger.error("Subscription failed") + return + pub_endpoint = reply.get("pub_endpoint", None) + sub_endpoint = reply.get("sub_endpoint", None) + topic = reply.get("topic", None) + if not pub_endpoint or not sub_endpoint or not topic: + logger.error("Subscription failed") + return + + logger.info(f"Subscription successful. New p2p_topic: {topic}") + if p2p_topic: + # The 'start' function is called as result of a restart + # Unsubscribe the old topic as soon as we get a working connection + logger.info("Cleaning up old p2p_topic subscription ...") + unsubscribe(zmq_endpoint, p2p_topic) + atexit.unregister(unsubscribe) + p2p_topic = topic + atexit.register(unsubscribe, zmq_endpoint, topic) + + # set task exception handler + loop = asyncio.get_event_loop() + + def exception_handler(loop, context): + logger.error(f"Error in async task: {context}") + + loop.set_exception_handler(exception_handler) + + # Start a heartbeat task so we notice when the Threat Bus host goes away + async_tasks.append( + asyncio.create_task(heartbeat(zmq_endpoint, p2p_topic, interval=5)) + ) + + # Start a receive task to retrieve real-time updates from Threat Bus + indicator_queue = asyncio.Queue() + async_tasks.append( + asyncio.create_task(receive(pub_endpoint, p2p_topic, indicator_queue)) + ) + async_tasks.append( + asyncio.create_task(process_indicators(indicator_queue, modules_config)) + ) + + loop = asyncio.get_event_loop() + for s in [signal.SIGHUP, signal.SIGTERM, signal.SIGINT]: + loop.add_signal_handler(s, lambda: asyncio.create_task(stop_signal())) + return await asyncio.gather(*async_tasks) + + +async def receive(pub_endpoint: str, topic: str, indicator_queue: asyncio.Queue): + """ + Starts a zmq subscriber on the given endpoint and listens for new messages + that are published on the given topic (zmq prefix matching). Depending on + the topic suffix, Indicators are enqueued to the indicator_queue. + @param pub_endpoint A host:port string to connect to via zmq + @param topic The topic prefix to subscribe to intelligence items + @param indicator_queue The queue to put arriving IoCs into + """ + global logger + socket = zmq.Context().socket(zmq.SUB) + socket.connect(f"tcp://{pub_endpoint}") + socket.setsockopt(zmq.SUBSCRIBE, topic.encode()) + poller = zmq.Poller() + poller.register(socket, zmq.POLLIN) + logger.info(f"Receiving via ZMQ on topic {pub_endpoint}/{topic}") + while True: + socks = dict(poller.poll(timeout=100)) # Smaller timeouts increase CPU load + if socket in socks and socks[socket] == zmq.POLLIN: + try: + topic, msg = socket.recv().decode().split(" ", 1) + except Exception as e: + logger.error(f"Error decoding message: {e}") + continue + # The topic is suffixed with the message type. Use it for filtering + if not topic.endswith("indicator"): + logger.debug(f"Skipping unsupported message: {msg}") + continue + # Put the message into the queue for incoming intel items, so they + # can be processed asynchronously + await indicator_queue.put(msg) + else: + await asyncio.sleep(0.01) # Free event loop for other tasks + + +async def process_indicators(indicator_queue: asyncio.Queue, modules_config: dict): + """ + Translates STIX-2 pattern and queries all configured modules via + STIX-Shifter. + @param indicator_queue The queue to put arriving IoCs into + @param modules_config User-provided configuration for STIX-Shifter modules + """ + while True: + msg = await indicator_queue.get() + try: + indicator = parse(msg, allow_custom=True) + except Exception as e: + logger.error( + f"Error parsing indicator from Threat Bus. Expected STIX-2 Indicator: {msg}, {e}" + ) + indicator_queue.task_done() + continue + logger.debug( + f"Converting indicator from Threat Bus to module-specific query: {indicator}" + ) + for module, opts in modules_config.items(): + asyncio.create_task(query_indicator(indicator, module, opts)) + indicator_queue.task_done() + + +async def query_indicator(indicator: Indicator, module: str, opts: dict): + """ + Translates an indicator into a module-specific query and executes it. E.g., + if the module is `splunk`, the indicator's pattern is first translated into + a valid Splunk query and then executed via the Splunk REST API. + @param indicator The indicator to translate and query + @param module The module's name, e.g., `splunk` + @param opts The module configuration directly taken from the user-defined + configuration file `config.yaml` with which this app was started + """ + max_results = opts["max_results"] + connection_opts = opts["connection"] + transmission_opts = opts.get("transmission", {}) + translation_opts = opts.get("translation", {}) + data_source = opts["data_source"] + + ## Translate the pattern to a module-specific query. + translation = stix_translation.StixTranslation() + dsl = translation.translate( + module, "query", indicator, indicator.pattern, translation_opts + ) + if not dsl.get("queries", None): + logger.error( + f"Failed to translate STIX-2 indicator with ID '{indicator.id}' to query for module '{module}': {dsl}" + ) + return + logger.debug(f"Translated pattern to {module} query: {dsl}") + + ## Run the query against the configured endpoint for this module. + transmission = stix_transmission.StixTransmission( + module, connection_opts, transmission_opts + ) + query_results = [] + for query in dsl["queries"]: + search_result = transmission.query(query) + if not search_result["success"]: + logger.error(str(search_result)) + continue + + search_id = search_result["search_id"] + + if transmission.is_async(): + status = transmission.status(search_id) + if not status.get("success", None): + logger.error(f"Fetching query status failed for module '{module}'") + return + while status["progress"] < 100 and status["status"] == "RUNNING": + status = transmission.status(search_id) + await asyncio.sleep(0.05) + result = transmission.results(search_id, 0, max_results) + if result["success"]: + # Collect all results + query_results += result["data"] + else: + logger.error(f"Fetching results failed for module '{module}': {result}") + + ## Translate query_results to STIX. + if not query_results: + return + + stix_results = translation.translate( + module, + "results", + json.dumps(data_source), + json.dumps(query_results), + translation_opts, + ) + # TODO: parse output and report back sightings + logger.debug(f"STIX Results: {stix_results}") + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--config", "-c", help="Path to a configuration file") + args = parser.parse_args() + + # Note that you must use names without dashes, use underscores instead for + # `confuse` to work without errors. + # Confuse uses the configuration name to lookup environment variables, but + # it simply upper-cases that name. Dashes are not replaced properly. Using a + # dash in the configuration name makes it impossible to configure the + # APPNAMEDIR env variable to overwrite search paths, i.e., in systemd + # https://confit.readthedocs.io/en/latest/#search-paths + # https://github.com/beetbox/confuse/blob/v1.4.0/confuse/core.py#L555 + config = confuse.Configuration("stix_shifter") + config.set_args(args) + if args.config: + config.set_file(args.config) + + try: + validate_config(config) + except Exception as e: + sys.exit(ValueError(f"Invalid config: {e}")) + + if config["logging"].get(dict): + setup_logging_with_config(config["logging"]) + else: + setup_logging_with_level(config["loglevel"].get(str)) + + while True: + try: + asyncio.run( + start( + config["threatbus"].get(), + config["snapshot"].get(), + config["modules"].get(dict), + ) + ) + except (KeyboardInterrupt, SystemExit): + return + except asyncio.CancelledError: + if user_exit: + # Tasks were cancelled because the user stopped the app. + return + logger.info("Restarting stix-shifter-threatbus ...") + + +if __name__ == "__main__": + main() diff --git a/apps/vast/CHANGELOG.md b/apps/vast/CHANGELOG.md index 5689381f..38444797 100644 --- a/apps/vast/CHANGELOG.md +++ b/apps/vast/CHANGELOG.md @@ -12,6 +12,11 @@ Every entry has a category for which we use the following visual abbreviations: ## Unreleased +- ⚠️ All Threat Bus apps that connect via ZeroMQ like `pyvast-threatbus`. now + shutdown gracefully and do not longer print a stack trace when receiving any + stop signal. + [#118](https://github.com/tenzir/threatbus/pull/118) + - ⚠️ The `-c` / `--config` parameter is now explicitly required to start `pyvast-threatbus`. Starting without it will print a helpful error message. [#119](https://github.com/tenzir/threatbus/pull/119) diff --git a/apps/vast/pyvast_threatbus/pyvast_threatbus.py b/apps/vast/pyvast_threatbus/pyvast_threatbus.py index 5f9d1f4f..e2a9cb54 100755 --- a/apps/vast/pyvast_threatbus/pyvast_threatbus.py +++ b/apps/vast/pyvast_threatbus/pyvast_threatbus.py @@ -16,6 +16,7 @@ ) from pyvast import VAST import random +import signal from shlex import split as lexical_split import socket from string import ascii_lowercase as letters @@ -32,13 +33,15 @@ logger_name = "pyvast-threatbus" logger = logging.getLogger(logger_name) matcher_name = None -# list of all running async tasks of the bridge +# List of all running async tasks of the bridge. async_tasks = [] -# the p2p topic that was given to the vast-bridge upon successful subscription +# The p2p topic that was given to the vast-bridge upon successful subscription. p2p_topic = None max_open_tasks = None +# Boolean flag indicating that the user has issued a SIGNAL (e.g., SIGTERM). +user_exit = False -# metric definitions +# Metric definitions. metrics = [] g_iocs_added = Gauge("added_iocs") g_iocs_removed = Gauge("removed_iocs") @@ -107,15 +110,28 @@ def validate_config(config: confuse.Subview): config["metrics"]["filename"].get(str) -def cancel_async_tasks(): +async def cancel_async_tasks(): """ Cancels all async tasks. """ global async_tasks for task in async_tasks: - task.cancel() - del task + if task is not asyncio.current_task(): + task.cancel() + del task async_tasks = [] + return await asyncio.gather(*async_tasks) + + +async def stop_signal(): + """ + Implements Python's asyncio eventloop signal handler + https://docs.python.org/3/library/asyncio-eventloop.html + Cancels all running tasks and exits the app. + """ + global user_exit + user_exit = True + await cancel_async_tasks() async def start( @@ -227,7 +243,9 @@ async def start( asyncio.create_task(write_metrics(metrics_interval, metrics_filename)) ) - atexit.register(cancel_async_tasks) + loop = asyncio.get_event_loop() + for s in [signal.SIGHUP, signal.SIGTERM, signal.SIGINT]: + loop.add_signal_handler(s, lambda: asyncio.create_task(stop_signal())) return await asyncio.gather(*async_tasks) @@ -730,14 +748,13 @@ async def heartbeat(endpoint: str, p2p_topic: str, interval: int = 5): @param p2p_topic The topic string to include in the heartbeat @param timeout The period after which the connection attempt is aborted """ - global logger, async_tasks + global logger action = {"action": "heartbeat", "topic": p2p_topic} while True: reply = send_manage_message(endpoint, action, interval) if not reply_is_success(reply): logger.error("Subscription with Threat Bus host became invalid") - cancel_async_tasks() - return + return await cancel_async_tasks() await asyncio.sleep(interval) @@ -790,7 +807,12 @@ def main(): config["sink"].get(), ) ) + except (KeyboardInterrupt, SystemExit): + return except asyncio.CancelledError: + if user_exit: + # Tasks were cancelled because the user stopped the app. + return logger.info("Restarting pyvast-threatbus ...") diff --git a/apps/zmq-app-template/Makefile b/apps/zmq-app-template/Makefile new file mode 100644 index 00000000..304558c1 --- /dev/null +++ b/apps/zmq-app-template/Makefile @@ -0,0 +1,36 @@ +colon := : +$(colon) := : + +.PHONY: all +all: format build dist test + +.PHONY: format +format: + python -m black . + +.PHONY: test +test: unit-tests + +.PHONY: unit-tests +unit-tests: + python -m unittest discover . + +.PHONY: clean +clean: + ${RM} -r __pycache__ *egg-info build dist + +.PHONY: build +build: + python setup.py build + +.PHONY: dist +dist: + python setup.py sdist bdist_wheel + +.PHONY: install +install: + pip install . + +.PHONY: dev-mode +dev-mode: + pip install --editable . diff --git a/apps/zmq-app-template/README.md b/apps/zmq-app-template/README.md new file mode 100644 index 00000000..517ff20f --- /dev/null +++ b/apps/zmq-app-template/README.md @@ -0,0 +1,23 @@ +Template for Apps that connect via ZeroMQ +========================================= + +Threat Bus is a publish-subscribe broker for threat intelligence. It is expected +that applications register themselves at the bus. This template provides a basic +`async` Python application that can (un)subsscribe to/from Threat Bus via +ZeroMQ. To connect via ZeroMQ, users must install and configure the +[`threatbus-zmq-app`](https://pypi.org/project/threatbus-zmq-app/) plugin on +their Threat Bus host. + +## Quick Start + +You can configure the app via a YAML configuration file. See +`config.yaml.example` for an example config file. + +Install the template in a virtualenv and start it with a config file: + +```sh +python -m venv venv +source venv/bin/activate +make dev-mode +zmq-app-template -c config.yaml +``` diff --git a/apps/zmq-app-template/config.yaml.example b/apps/zmq-app-template/config.yaml.example new file mode 100644 index 00000000..a769be1f --- /dev/null +++ b/apps/zmq-app-template/config.yaml.example @@ -0,0 +1,9 @@ +logging: + console: true + console_verbosity: DEBUG + file: true + file_verbosity: DEBUG + filename: zmq-app-template.log + +threatbus: localhost:13370 +snapshot: 30 diff --git a/apps/zmq-app-template/setup.py b/apps/zmq-app-template/setup.py new file mode 100644 index 00000000..376a9946 --- /dev/null +++ b/apps/zmq-app-template/setup.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from setuptools import setup + +with open("README.md", "r") as fh: + long_description = fh.read() + +setup( + author="Tenzir", + author_email="engineering@tenzir.com", + classifiers=[ + # https://pypi.org/classifiers/ + "Development Status :: 3 - Alpha", + "License :: OSI Approved :: BSD License", + "Operating System :: MacOS :: MacOS X", + "Operating System :: POSIX :: Linux", + "Topic :: Scientific/Engineering :: Information Analysis", + "Topic :: Scientific/Engineering :: Interface Engine/Protocol Translator", + "Topic :: Security", + "Topic :: Software Development :: Object Brokering", + "Topic :: System :: Distributed Computing", + ], + description="Application template to connect to Threat Bus via ZeroMQ", + entry_points={ + "console_scripts": ["zmq-app-template=zmq_app_template.template:main"] + }, + include_package_data=True, + install_requires=[ + "black >= 19.10b", + "coloredlogs >= 14.0", + "confuse", + "pyzmq >= 19", + "stix2 >= 2.1", + "threatbus >= 2021.4.29", + ], + keywords=[ + "open source", + "threatbus", + "Threat Bus", + "threat intelligence", + "TI", + "TI dissemination", + ], + license="BSD 3-clause", + long_description=long_description, + long_description_content_type="text/markdown", + name="zmq-app-template", + packages=["zmq_app_template"], + python_requires=">=3.7", + setup_requires=["setuptools", "wheel"], + url="https://github.com/tenzir/threatbus", + version="2021.04.29", +) diff --git a/apps/zmq-app-template/zmq_app_template/__init__.py b/apps/zmq-app-template/zmq_app_template/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/apps/zmq-app-template/zmq_app_template/template.py b/apps/zmq-app-template/zmq_app_template/template.py new file mode 100755 index 00000000..277fd0c3 --- /dev/null +++ b/apps/zmq-app-template/zmq_app_template/template.py @@ -0,0 +1,324 @@ +#!/usr/bin/env python3 + +import argparse +import asyncio +import atexit +import coloredlogs +import confuse +import logging +import signal +from stix2 import parse +import sys +from threatbus.logger import setup as setup_logging_threatbus +import zmq + +logger_name = "zmq-app-template" +logger = logging.getLogger(logger_name) +# List of all running async tasks of the bridge. +async_tasks = [] +# The p2p topic sent back by Threat Bus upon successful subscription. +p2p_topic = None +# Boolean flag indicating that the user has issued a SIGNAL (e.g., SIGTERM). +user_exit = False + +### --------------------------- Application helpers --------------------------- + + +def setup_logging_with_level(level: str): + """ + Sets up a the global logger for console logging with the given loglevel. + @param level The loglevel to use, e.g., "DEBUG" + """ + global logger + log_level = logging.getLevelName(level.upper()) + + fmt = "%(asctime)s %(levelname)-8s %(message)s" + colored_formatter = coloredlogs.ColoredFormatter(fmt) + + handler = logging.StreamHandler() + handler.setLevel(log_level) + if logger.level > log_level or logger.level == 0: + logger.setLevel(log_level) + handler.setFormatter(colored_formatter) + logger.addHandler(handler) + + +def setup_logging_with_config(config: confuse.Subview): + """ + Sets up the global logger as configured in the `config` object. + @param config The user-defined logging configuration + """ + global logger + logger = setup_logging_threatbus(config, logger_name) + + +def validate_config(config: confuse.Subview): + assert config, "config must not be None" + config["threatbus"].get(str) + config["snapshot"].get(int) + + +async def cancel_async_tasks(): + """ + Cancels all async tasks. + """ + global async_tasks + for task in async_tasks: + if task is not asyncio.current_task(): + task.cancel() + del task + async_tasks = [] + return await asyncio.gather(*async_tasks) + + +async def stop_signal(): + """ + Implements Python's asyncio eventloop signal handler + https://docs.python.org/3/library/asyncio-eventloop.html + Cancels all running tasks and exits the app. + """ + global user_exit + user_exit = True + await cancel_async_tasks() + + +### --------------- ZeroMQ communication / management functions --------------- + + +def send_manage_message(endpoint: str, action: dict, timeout: int = 5): + """ + Sends a 'management' message, following the threatbus-zmq-app protocol to + either subscribe or unsubscribe this application to/from Threat Bus. + @param endpoint A host:port string to connect to via ZeroMQ + @param action The message to send as JSON + @param timeout The period after which the connection attempt is aborted + """ + context = zmq.Context() + socket = context.socket(zmq.REQ) + socket.setsockopt(zmq.LINGER, 0) + socket.connect(f"tcp://{endpoint}") + socket.send_json(action) + poller = zmq.Poller() + poller.register(socket, zmq.POLLIN) + + reply = None + if poller.poll(timeout * 1000): + reply = socket.recv_json() + socket.close() + context.term() + return reply + + +def reply_is_success(reply: dict): + """ + Predicate to check if `reply` is a dict and contains the key-value pair + "status" = "success" + @param reply A python dict + @return True if the dict contains "status" = "success" + """ + return ( + reply + and type(reply) is dict + and reply.get("status", None) + and reply["status"] == "success" + ) + + +def subscribe(endpoint: str, topic: str, snapshot: int, timeout: int = 5): + """ + Subscribes this app to Threat Bus for the given topic. Requests an optional + snapshot of historical indicators. + @param endpoint The ZMQ management endpoint of Threat Bus ('host:port') + @param topic The topic to subscribe to + @param snapshot An integer value to request n days of historical IoC items + @param timeout The period after which the connection attempt is aborted + """ + global logger + logger.info(f"Subscribing to topic '{topic}'...") + action = {"action": "subscribe", "topic": topic, "snapshot": snapshot} + return send_manage_message(endpoint, action, timeout) + + +def unsubscribe(endpoint: str, topic: str, timeout: int = 5): + """ + Unsubscribes this app from Threat Bus for the given topic. + @param endpoint The ZMQ management endpoint of Threat Bus + @param topic The topic to unsubscribe from + @param timeout The period after which the connection attempt is aborted + """ + global logger + logger.info(f"Unsubscribing from topic '{topic}' ...") + action = {"action": "unsubscribe", "topic": topic} + reply = send_manage_message(endpoint, action, timeout) + if not reply_is_success(reply): + logger.warning("Unsubscription failed") + return + logger.info("Unsubscription successful") + + +async def heartbeat(endpoint: str, p2p_topic: str, interval: int = 5): + """ + Sends heartbeats to Threat Bus periodically to check if the given p2p_topic + is still valid at the Threat Bus host. Cancels all async tasks of this app + when the heartbeat fails and stops the heartbeat. + @param endpoint The ZMQ management endpoint of Threat Bus + @param p2p_topic The topic string to include in the heartbeat + @param timeout The period after which the connection attempt is aborted + """ + global logger + action = {"action": "heartbeat", "topic": p2p_topic} + while True: + reply = send_manage_message(endpoint, action, interval) + if not reply_is_success(reply): + logger.error("Subscription with Threat Bus host became invalid") + return await cancel_async_tasks() + await asyncio.sleep(interval) + + +### --------------------------- The actual app logic --------------------------- + + +async def start(zmq_endpoint: str, snapshot: int): + """ + Starts the template app. + @param zmq_endpoint The ZMQ management endpoint of Threat Bus ('host:port') + @param snapshot An integer value to request n days of historical IoC items + """ + global logger, async_tasks, p2p_topic + # needs to be created inside the same eventloop where it is used + logger.debug(f"Calling Threat Bus management endpoint {zmq_endpoint}") + reply = subscribe(zmq_endpoint, "stix2/indicator", snapshot) + if not reply_is_success(reply): + logger.error("Subscription failed") + return + pub_endpoint = reply.get("pub_endpoint", None) + sub_endpoint = reply.get("sub_endpoint", None) + topic = reply.get("topic", None) + if not pub_endpoint or not sub_endpoint or not topic: + logger.error("Subscription failed") + return + + logger.info(f"Subscription successful. New p2p_topic: {topic}") + if p2p_topic: + # The 'start' function is called as result of a restart + # Unsubscribe the old topic as soon as we get a working connection + logger.info("Cleaning up old p2p_topic subscription ...") + unsubscribe(zmq_endpoint, p2p_topic) + atexit.unregister(unsubscribe) + p2p_topic = topic + atexit.register(unsubscribe, zmq_endpoint, topic) + + # Start a heartbeat task so we notice when the Threat Bus host goes away + async_tasks.append( + asyncio.create_task(heartbeat(zmq_endpoint, p2p_topic, interval=5)) + ) + + # Start a receive task to retrieve real-time updates from Threat Bus + indicator_queue = asyncio.Queue() + async_tasks.append( + asyncio.create_task(receive(pub_endpoint, p2p_topic, indicator_queue)) + ) + async_tasks.append(asyncio.create_task(do_something_with_intel(indicator_queue))) + + loop = asyncio.get_event_loop() + for s in [signal.SIGHUP, signal.SIGTERM, signal.SIGINT]: + loop.add_signal_handler(s, lambda: asyncio.create_task(stop_signal())) + return await asyncio.gather(*async_tasks) + + +async def receive(pub_endpoint: str, topic: str, indicator_queue: asyncio.Queue): + """ + Starts a zmq subscriber on the given endpoint and listens for new messages + that are published on the given topic (zmq prefix matching). Depending on + the topic suffix, Indicators are enqueued to the indicator_queue. + @param pub_endpoint A host:port string to connect to via zmq + @param topic The topic prefix to subscribe to intelligence items + @param indicator_queue The queue to put arriving IoCs into + """ + global logger + socket = zmq.Context().socket(zmq.SUB) + socket.connect(f"tcp://{pub_endpoint}") + socket.setsockopt(zmq.SUBSCRIBE, topic.encode()) + poller = zmq.Poller() + poller.register(socket, zmq.POLLIN) + logger.info(f"Receiving via ZMQ on topic {pub_endpoint}/{topic}") + while True: + socks = dict(poller.poll(timeout=100)) # Smaller timeouts increase CPU load + if socket in socks and socks[socket] == zmq.POLLIN: + try: + topic, msg = socket.recv().decode().split(" ", 1) + except Exception as e: + logger.error(f"Error decoding message: {e}") + continue + # The topic is suffixed with the message type. Use it for filtering + if not topic.endswith("indicator"): + logger.debug(f"Skipping unsupported message: {msg}") + continue + # Put the message into the queue for incoming intel items, so they + # can be processed asynchronously + await indicator_queue.put(msg) + else: + await asyncio.sleep(0.01) # Free event loop for other tasks + + +async def do_something_with_intel(indicator_queue: asyncio.Queue): + """ + Does something with the received indicators. + @param indicator_queue The queue to put arriving IoCs into + """ + while True: + msg = await indicator_queue.get() + indicator = parse(msg, allow_custom=True) + logger.debug(f"Got indicator from Threat Bus: {indicator}") + # Do something with it, e.g., check its type and then start operating on + # the timestamps, the STIX pattern, ... + indicator_queue.task_done() + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--config", "-c", help="Path to a configuration file") + args = parser.parse_args() + + # Note that you must use names without dashes, use underscores instead for + # `confuse` to work without errors. + # Confuse uses the configuration name to lookup environment variables, but + # it simply upper-cases that name. Dashes are not replaced properly. Using a + # dash in the configuration name makes it impossible to configure the + # APPNAMEDIR env variable to overwrite search paths, i.e., in systemd + # https://confit.readthedocs.io/en/latest/#search-paths + # https://github.com/beetbox/confuse/blob/v1.4.0/confuse/core.py#L555 + config = confuse.Configuration("zmq_app_template") + config.set_args(args) + if args.config: + config.set_file(args.config) + + try: + validate_config(config) + except Exception as e: + sys.exit(ValueError(f"Invalid config: {e}")) + + if config["logging"].get(dict): + setup_logging_with_config(config["logging"]) + else: + setup_logging_with_level(config["loglevel"].get(str)) + + while True: + try: + asyncio.run( + start( + config["threatbus"].get(), + config["snapshot"].get(), + ) + ) + except (KeyboardInterrupt, SystemExit): + return + except asyncio.CancelledError: + if user_exit: + # Tasks were cancelled because the user stopped the app. + return + logger.info("Restarting template app ...") + + +if __name__ == "__main__": + main()