Skip to content

Commit

Permalink
Merge pull request #280 from splunk/feature/version-bumping-enforcement
Browse files Browse the repository at this point in the history
Feature: Adding version enforcement
  • Loading branch information
pyth0n1c authored Sep 17, 2024
2 parents 7db0d49 + 094b4bb commit bf6fe08
Show file tree
Hide file tree
Showing 10 changed files with 969 additions and 117 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ This section is under active development. It will allow you to a [MITRE Map](ht
Choose TYPE {detection, story} to create new content for the Content Pack. The tool will interactively ask a series of questions required for generating a basic piece of content and automatically add it to the Content Pack.

### contentctl inspect
This section is under development. It will enable the user to perform an appinspect of the content pack in preparation for deployment onto a Splunk Instance or via Splunk Cloud.
This section is under development. The inspect action performs a number of post-build validations. Primarily, it will enable the user to perform an appinspect of the content pack in preparation for deployment onto a Splunk Instance or via Splunk Cloud. It also compares detections in the new build against a prior build, confirming that any changed detections have had their versions incremented (this comparison happens at the savedsearch.conf level, which is why it must happen after the build). Please also note that new versions of contentctl may result in the generation of different savedsearches.conf files without any content changes in YML (new keys at the .conf level which will necessitate bumping of the version in the YML file).

### contentctl deploy
The reason to build content is so that it can be deployed to your environment. However, deploying content to multiple servers and different types of infrastructure can be tricky and time-consuming. contentctl makes this easy by supporting a number of different deployment mechanisms. Deployment targets can be defined in [contentctl.yml](/contentctl/templates/contentctl_default.yml).
Expand Down
280 changes: 189 additions & 91 deletions contentctl/actions/inspect.py

Large diffs are not rendered by default.

151 changes: 141 additions & 10 deletions contentctl/helper/splunk_app.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import os
import time
import json
from typing import Optional, Collection
from pathlib import Path
import xml.etree.ElementTree as ET
from typing import List, Tuple, Optional
from urllib.parse import urlencode

import requests
import urllib3
import xmltodict
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from urllib3.util.retry import Retry

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

MAX_RETRY = 3


class APIEndPoint:
"""
Class which contains Static Endpoint
Expand All @@ -27,6 +27,7 @@ class APIEndPoint:
SPLUNK_BASE_GET_UID_REDIRECT = "https://apps.splunk.com/apps/id/{app_name_id}"
SPLUNK_BASE_APP_INFO = "https://splunkbase.splunk.com/api/v1/app/{app_uid}"


class RetryConstant:
"""
Class which contains Retry Constant
Expand All @@ -53,11 +54,11 @@ class InitializationError(Exception):

@staticmethod
def requests_retry_session(
retries=RetryConstant.RETRY_COUNT,
backoff_factor=1,
status_forcelist=(500, 502, 503, 504),
session=None,
):
retries: int = RetryConstant.RETRY_COUNT,
backoff_factor: int = 1,
status_forcelist: Collection[int] = (500, 502, 503, 504),
session: requests.Session | None = None,
) -> requests.Session:
session = session or requests.Session()
retry = Retry(
total=retries,
Expand Down Expand Up @@ -260,4 +261,134 @@ def set_latest_version_info(self) -> None:

# parse out the version number and fetch the download URL
self.latest_version = info_url.split("/")[-1]
self.latest_version_download_url = self.__fetch_url_latest_version_download(info_url)
self.latest_version_download_url = self.__fetch_url_latest_version_download(info_url)

def __get_splunk_base_session_token(self, username: str, password: str) -> str:
"""
This method will generate Splunk base session token
:param username: Splunkbase username
:type username: str
:param password: Splunkbase password
:type password: str
:return: Splunk base session token
:rtype: str
"""
# Data payload for fetch splunk base session token
payload = urlencode(
{
"username": username,
"password": password,
}
)

headers = {
"content-type": "application/x-www-form-urlencoded",
"cache-control": "no-cache",
}

response = requests.request(
"POST",
APIEndPoint.SPLUNK_BASE_AUTH_URL,
data=payload,
headers=headers,
)

token_value = ""

if response.status_code != 200:
msg = (
f"Error occurred while executing the rest call for splunk base authentication api,"
f"{response.content}"
)
raise Exception(msg)
else:
root = ET.fromstring(response.content)
token_value = root.find("{http://www.w3.org/2005/Atom}id").text.strip()
return token_value

def download(
self,
out: Path,
username: str,
password: str,
is_dir: bool = False,
overwrite: bool = False
) -> Path:
"""
Given an output path, download the app to the specified location
:param out: the Path to download the app to
:type out: :class:`pathlib.Path`
:param username: Splunkbase username
:type username: str
:param password: Splunkbase password
:type password: str
:param is_dir: a flag indicating whether out is directory, otherwise a file (default: False)
:type is_dir: bool
:param overwrite: a flag indicating whether we can overwrite the file at out or not
:type overwrite: bool
:returns path: the Path the download was written to (needed when is_dir is True)
:rtype: :class:`pathlib.Path`
"""
# Get the Splunkbase session token
token = self.__get_splunk_base_session_token(username, password)
response = requests.request(
"GET",
self.latest_version_download_url,
cookies={
"sessionid": token
}
)

# If the provided output path was a directory we need to try and pull the filename from the
# response headers
if is_dir:
try:
# Pull 'Content-Disposition' from the headers
content_disposition: str = response.headers['Content-Disposition']

# Attempt to parse the filename as a KV
key, value = content_disposition.strip().split("=")
if key != "attachment;filename":
raise ValueError(f"Unexpected key in 'Content-Disposition' KV pair: {key}")

# Validate the filename is the expected .tgz file
filename = Path(value.strip().strip('"'))
if filename.suffixes != [".tgz"]:
raise ValueError(f"Filename has unexpected extension(s): {filename.suffixes}")
out = Path(out, filename)
except KeyError as e:
raise KeyError(
f"Unable to properly extract 'Content-Disposition' from response headers: {e}"
) from e
except ValueError as e:
raise ValueError(
f"Unable to parse filename from 'Content-Disposition' header: {e}"
) from e

# Ensure the output path is not already occupied
if out.exists() and not overwrite:
msg = (
f"File already exists at {out}, cannot download the app."
)
raise Exception(msg)

# Make any parent directories as needed
out.parent.mkdir(parents=True, exist_ok=True)

# Check for HTTP errors
if response.status_code != 200:
msg = (
f"Error occurred while executing the rest call for splunk base authentication api,"
f"{response.content}"
)
raise Exception(msg)

# Write the app to disk
with open(out, "wb") as file:
file.write(response.content)

return out
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,11 @@ def metadata(self) -> dict[str, str|float]:
# NOTE: we ignore the type error around self.status because we are using Pydantic's
# use_enum_values configuration
# https://docs.pydantic.dev/latest/api/config/#pydantic.config.ConfigDict.populate_by_name


# NOTE: The `inspect` action is HIGHLY sensitive to the structure of the metadata line in
# the detection stanza in savedsearches.conf. Additive operations (e.g. a new field in the
# dict below) should not have any impact, but renaming or removing any of these fields will
# break the `inspect` action.
return {
'detection_id': str(self.id),
'deprecated': '1' if self.status == DetectionStatus.deprecated.value else '0', # type: ignore
Expand Down
109 changes: 96 additions & 13 deletions contentctl/objects/config.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
from __future__ import annotations

from os import environ
from datetime import datetime, UTC
from typing import Optional, Any, List, Union, Self
import random
from enum import StrEnum, auto
import pathlib
from urllib.parse import urlparse
from abc import ABC, abstractmethod
from functools import partialmethod

import tqdm
import semantic_version
from pydantic import (
BaseModel, Field, field_validator,
field_serializer, ConfigDict, DirectoryPath,
PositiveInt, FilePath, HttpUrl, AnyUrl, model_validator,
ValidationInfo
)

from contentctl.objects.constants import DOWNLOADS_DIRECTORY
from contentctl.output.yml_writer import YmlWriter
from os import environ
from datetime import datetime, UTC
from typing import Optional,Any,Annotated,List,Union, Self
import semantic_version
import random
from enum import StrEnum, auto
import pathlib
from contentctl.helper.utils import Utils
from urllib.parse import urlparse
from abc import ABC, abstractmethod
from contentctl.objects.enums import PostTestBehavior, DetectionTestingMode
from contentctl.objects.detection import Detection
from contentctl.objects.annotated_types import APPID_TYPE
import tqdm
from functools import partialmethod
from contentctl.helper.splunk_app import SplunkApp

ENTERPRISE_SECURITY_UID = 263
COMMON_INFORMATION_MODEL_UID = 1621
Expand Down Expand Up @@ -252,11 +257,89 @@ class StackType(StrEnum):
classic = auto()
victoria = auto()


class inspect(build):
splunk_api_username: str = Field(description="Splunk API username used for running appinspect.")
splunk_api_password: str = Field(exclude=True, description="Splunk API password used for running appinspect.")
splunk_api_username: str = Field(
description="Splunk API username used for appinspect and Splunkbase downloads."
)
splunk_api_password: str = Field(
exclude=True,
description="Splunk API password used for appinspect and Splunkbase downloads."
)
enable_metadata_validation: bool = Field(
default=False,
description=(
"Flag indicating whether detection metadata validation and versioning enforcement "
"should be enabled."
)
)
enrichments: bool = Field(
default=True,
description=(
"[NOTE: enrichments must be ENABLED for inspect to run. Please adjust your config "
f"or CLI invocation appropriately] {validate.model_fields['enrichments'].description}"
)
)
# TODO (cmcginley): wording should change here if we want to be able to download any app from
# Splunkbase
previous_build: str | None = Field(
default=None,
description=(
"Local path to the previous app build for metatdata validation and versioning "
"enforcement (defaults to the latest release of the app published on Splunkbase)."
)
)
stack_type: StackType = Field(description="The type of your Splunk Cloud Stack")

@field_validator("enrichments", mode="after")
@classmethod
def validate_needed_flags_metadata_validation(cls, v: bool, info: ValidationInfo) -> bool:
"""
Validates that `enrichments` is True for the inspect action
:param v: the field's value
:type v: bool
:param info: the ValidationInfo to be used
:type info: :class:`pydantic.ValidationInfo`
:returns: bool, for v
:rtype: bool
"""
# Enforce that `enrichments` is True for the inspect action
if v is False:
raise ValueError("Field `enrichments` must be True for the `inspect` action")

return v

def get_previous_package_file_path(self) -> pathlib.Path:
"""
Returns a Path object for the path to the prior package build. If no path was provided, the
latest version is downloaded from Splunkbase and it's filepath is returned, and saved to the
in-memory config (so download doesn't happen twice in the same run).
:returns: Path object to previous app build
:rtype: :class:`pathlib.Path`
"""
previous_build_path = self.previous_build
# Download the previous build as the latest release on Splunkbase if no path was provided
if previous_build_path is None:
print(
f"Downloading latest {self.app.label} build from Splunkbase to serve as previous "
"build during validation..."
)
app = SplunkApp(app_uid=self.app.uid)
previous_build_path = app.download(
out=pathlib.Path(DOWNLOADS_DIRECTORY),
username=self.splunk_api_username,
password=self.splunk_api_password,
is_dir=True,
overwrite=True
)
print(f"Latest release downloaded from Splunkbase to: {previous_build_path}")
self.previous_build = str(previous_build_path)
return pathlib.Path(previous_build_path)


class NewContentType(StrEnum):
detection = auto()
story = auto()
Expand Down
5 changes: 4 additions & 1 deletion contentctl/objects/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,7 @@
RBA_OBSERVABLE_ROLE_MAPPING = {
"Attacker": 0,
"Victim": 1
}
}

# The relative path to the directory where any apps/packages will be downloaded
DOWNLOADS_DIRECTORY = "downloads"
Loading

0 comments on commit bf6fe08

Please sign in to comment.