Home

Awesome

<div align="center"> <a href="https://www.attackiq.com" target="_blank"> <img src="https://www.attackiq.com/wp-content/uploads/2021/10/col-dflt.png" height="300" alt="AttackIQ"> </a> </div> <h1 align="center">SigmAIQ: pySigma Wrapper & Utils</h1>

Tests Coverage Badge Status PyPI version Python versions pySigma version License

Table of Contents

Introduction

SigmAIQ is a wrapper for pySigma and pySigma backends & pipelines. It allows detection engineers to easily convert Sigma rules and rule collections to SIEM/product queries without having to worry about the overhead of ensuring the correct pipelines and output formats are used by each pySigma supported backend. SigmAIQ also contains custom pipelines and output formats for various backends that are not found in the original backend source code. If you don't see a backend that's currently supported, please consider contributing to the Sigma/pySigma community by making it with this pySigma Cookiecutter Template

In addition, SigmAIQ contains pySigma related tools and scripts, including easy Sigma rule searching, LLM support, an automatic rule creation from IOCs.

This library is currently maintained by:

Project Status

SigmAIQ is currently in pre-release status. It is a constant work-in-progress and bugs may be encountered. Please report any issues here.

Feature requests are always welcome! pySigma tools/utils are currently not in the pre-release version, and will be added in future releases.

LLM Support

For LLM usage, see the LLM README

Installation & Usage

Requirements

Installation

SigmAIQ can be installed with your favorite package manager:

pip install sigmaiq
pipenv install sigmaiq
poetry add sigmaiq

To install the LLM dependencies, use the llm extra:

pip install sigmaiq[llm]
pipenv install sigmaiq[llm]
poetry add sigmaiq[llm]

Usage Quickstart

Create a backend from the list of available backends, then give a valid Sigma rule to convert to a query. You can find the list of available backends in this README, or SigmAIQBackend.display_available_backends().

from sigmaiq import SigmAIQBackend

sigma_rule = """
    title: Test Rule
    logsource:
        category: process_creation
        product: windows
    detection:
        sel:
            CommandLine: mimikatz.exe
        condition: sel
"""

# Create backend
backend = SigmAIQBackend(backend="microsoft365defender").create_backend()

# Convert Rule or Collection
output = backend.translate(sigma_rule)
print(output)

Output:

['DeviceProcessEvents
| where ProcessCommandLine =~ "mimikatz.exe"']

Although you can pass a SigmaRule or SigmaCollection object to translate() like you would to convert() or convert_rule() for a typical pySigma backend, there is no need with SigmAIQ. As long as a valid Sigma rule is given as a YAML str or dictionary (or list of), SigmAIQ will take care of it for you.

Usage Examples

Backends

Typical usage will be using the SigmAIQBackend class from sigmaiq to create a customized pySigma backend, then use translate() to convert a SigmaRule or SigmaCollection to a query:

from sigmaiq import SigmAIQBackend
from sigma.rule import SigmaRule

sigma_rule = SigmaRule.from_yaml(
    """
    title: Test Rule
    logsource:
        category: process_creation
        product: windows
    detection:
        sel:
            CommandLine: mimikatz.exe
        condition: sel
    """
)

backend = SigmAIQBackend(backend="splunk").create_backend()
print(backend.translate(sigma_rule))

Output: ['CommandLine="mimikatz.exe"']

Specifying Output Formats

Passing the output_format arg will use an original output specified by the original backend, or a custom format implemented by SigmAIQ. You can find information about output formats specific to each backend via SigmAIQBackend.display_backends_and_outputs()The necessary processing pipelines are automatically applied, even if the original pySigma backend does not automatically apply it:

from sigmaiq import SigmAIQBackend
from sigma.rule import SigmaRule
from sigma.backends.splunk import SplunkBackend

sigma_rule = SigmaRule.from_yaml(
    """
    title: Test Rule
    logsource:
        category: process_creation
        product: windows
    detection:
        sel:
            CommandLine: mimikatz.exe
        condition: sel
    """
)
# Raises sigma.exceptions.SigmaFeatureNotSupportedByBackendError
orig_backend = SplunkBackend()
print("Original Backend:")
try:
    print(orig_backend.convert_rule(sigma_rule, output_format="data_model"))
except Exception as exc:
    print(exc)
print("\n")

# Necessary pipeline for output_format automatically applied
print("SigmAIQ Backend:")
sigmaiq_backend = SigmAIQBackend(backend="splunk", output_format="data_model").create_backend()
print(sigmaiq_backend.translate(sigma_rule))

Output:

Original Backend:
No data model specified by processing pipeline

SigmAIQ Backend:
['| tstats summariesonly=false allow_old_summaries=true fillnull_value="null" count min(_time) as firstTime max(_time) 
as lastTime from datamodel=Endpoint.Processes where Processes.process="mimikatz.exe" by Processes.process 
Processes.dest Processes.process_current_directory Processes.process_path Processes.process_integrity_level 
Processes.parent_process Processes.parent_process_path Processes.parent_process_guid Processes.parent_process_id 
Processes.process_guid Processes.process_id Processes.user | `drop_dm_object_name(Processes)` 
| convert timeformat="%Y-%m-%dT%H:%M:%S" ctime(firstTime) | convert timeformat="%Y-%m-%dT%H:%M:%S" ctime(lastTime) ']

Pipelines

Specifying Pipelines

You can specify a specific pipeline to be applied to the SigmaRule by passing it to the backend factory. Generally, you want to only apply pipelines to a backend meant for that specific backend. You can use a name of a pipeline as defined in SigmAIQPipeline.display_available_pipelines(), or pass any pySigma ProcessingPipeline object. The pipeline can be passed directory to SigmAIQPipeline, or created with SigmAIQPipeline.

from sigmaiq import SigmAIQBackend, SigmAIQPipeline

# Directly to backend
backend = SigmAIQBackend(backend="elasticsearch",
                         processing_pipeline="ecs_zeek_beats").create_backend()

# Create pipeline first, then pass to backend
pipeline = SigmAIQPipeline(processing_pipeline="ecs_zeek_beats").create_pipeline()
backend = SigmAIQBackend(backend="elasticsearch",
                         processing_pipeline=pipeline).create_backend()

Combining Multiple Pipelines

The SigmAIQPipelineResolver class automates combining multiple pipelines together via pySigma's ProcessingPipelineResolver class. This results in a single ProcessingPipeline object that are applied in order of priority of each ProcessingPipeline's priority. You can pass any named available pipeline, ProcessingPipeline object, or callable that returns any valid combination of these two types:

from sigmaiq import SigmAIQPipelineResolver
from sigma.pipelines.sysmon import sysmon_pipeline
from sigma.pipelines.sentinelone import sentinelone_pipeline

# ProcessingPipeline Object
proc_pipeline_obj = sysmon_pipeline()

# Available Pipeline Name
pipeline_named = "splunk_windows"

my_pipelines = [sysmon_pipeline(),  # ProcessingPipeline type
                "splunk_windows",  # Available pipeline name
                sentinelone_pipeline  # Callable that returns a ProcessingPipeline type
                ]

my_pipeline = SigmAIQPipelineResolver(processing_pipelines=my_pipelines).process_pipelines(
    name="My New Optional Pipeline Name")

print(f"Created single new pipeline from {len(my_pipelines)} pipelines.")
print(f"New pipeline '{my_pipeline.name}' contains {len(my_pipeline.items)} ProcessingItems.")

Output:

Created single new pipeline from 3 pipelines.
New pipeline 'My New Optional Pipeline Name' contains 103 ProcessingItems.

Custom Fieldmappings

A dictionary can be used to create a custom fieldmappings pipeline on the fly. Each key should be the original fieldname, with each value being a new fieldname or list of new fieldnames:

from sigmaiq import SigmAIQPipeline
from sigma.rule import SigmaRule

sigma_rule = SigmaRule.from_yaml(
    """
    title: Test Rule
    logsource:
        category: process_creation
        product: windows
    detection:
        sel:
            CommandLine: mimikatz.exe
        condition: sel
    """
)

custom_fieldmap = {'CommandLine': 'NewCommandLineField'}
custom_pipeline = SigmAIQPipeline.from_fieldmap(custom_fieldmap).create_pipeline()
print(f"Original Fieldname: {list(sigma_rule.detection.detections.values())[0].detection_items[0].field}")
custom_pipeline.apply(sigma_rule)
print(f"New Fieldname: {list(sigma_rule.detection.detections.values())[0].detection_items[0].field}")

Output:

Original Fieldname: CommandLine
New Fieldname: NewCommandLineField

All-In-One Conversion

The create_all_and_translate() method for the backend factory will automatically create backends for all possible available backends, and create queries for all possible pipelines & output formats for each backend. If show_errors=False (default), any invalid queries due to pipeline errors, such as unsupported fields, will be left out of the results dictionary:

from sigmaiq import SigmAIQBackend
from sigma.rule import SigmaRule
from pprint import pprint

sigma_rule = SigmaRule.from_yaml(
    """
    title: Test Rule
    logsource:
        category: process_creation
        product: windows
    detection:
        sel:
            CommandLine: mimikatz.exe
        condition: sel
    """
)

output = SigmAIQBackend.create_all_and_translate(sigma_rule)
pprint(output)

Output:

{backend: {pipeline: {output_format: query} } }

{'carbonblack': {'carbonblack': {'default': ['os_type:windows '
                                             'cmdline:mimikatz.exe'],
                                 'json': [{'description': None,
                                           'id': None,
                                           'query': 'os_type:windows '
                                                    'cmdline:mimikatz.exe',
                                           'title': 'Test Rule'}]},
                 'carbonblack_enterprise': {'default': ['device_os:WINDOWS '
                                                        'process_cmdline:mimikatz.exe'],
                                            'json': [{'description': None,
                                                      'id': None,
                                                      'query': 'device_os:WINDOWS '
                                                               'process_cmdline:mimikatz.exe',
                                                      'title': 'Test Rule'}]}},
 'crowdstrike_splunk': {'crowdstrike': {'default': ['event_simpleName="ProcessRollup2" '
                                                    'CommandLine="mimikatz.exe"']}},
 'crowdstrike_logscale': {'crowdstrike': {'default': ['event_simpleName="ProcessRollup2" '
                                                    'CommandLine="mimikatz.exe"']}},
 'elasticsearch': {'ecs_windows': {'default': ['process.command_line:mimikatz.exe'],
 ...

Supported Options

Backends

Available Backends

Backend OptionDescriptionAssociated PipelinesDefault Pipeline
carbonblackCarbon Black EDRcarbonblack<br>carbonblack_enterprisecarbonblack
cortexxdrPalo Alto Cortex XDRcortexxdrcortexxdr
crowdstrike_splunkCrowdstrike FDR Splunk Querycrowdstrike_fdrcrowdstrike_fdr
crowdstrike_logscaleCrowdstrike Logscale Querycrowdstrike_falconcrowdstrike_falcon
elasticsearchElastic Elasticsearch SIEMecs_windows<br>ecs_kubernetes<br>ecs_windows_old<br>ecs_zeek_beats<br>ecs_zeek_corelight<br>zeek_rawecs_windows
insightidrRapid7 InsightIDR SIEMinsightidrinsightidr
lokiGrafana Loki LogQL SIEMloki_grafana_logfmt<br>loki_promtail_sysmon<br>loki_okta_system_logloki_grafana_logfmt
microsoft_xdrMicrosoft XDR Advanced Hunting Query (KQL) (Defender, Office365, etc)microsoft_xdrmicrosoft_xdr
microsoft_sentinel_asimMicrosoft Sentinel ASIM Query (KQL)sentinel_asimsentinel_asim
microsoft_azure_monitorMicrosoft Azure Monitor Query (KQL)azure_monitorazure_monitor
netwitnessNetwitness Querynetwitness_windowsnetwitness_windows
opensearchOpenSearch Luceneecs_windows<br>ecs_windows_old<br>ecs_zeek_beats<br>ecs_zeek_corelight<br>zeek_rawecs_windows
qradarIBM QRadarqradar_fields<br>qradar_payloadqradar_fields
secopsGoogle SecOps (Chronicle)secops_udmsecops_udm
sentineloneSentinelOne EDRsentinelonesentinelone
splunkSplunk SIEMsplunk_windows<br>splunk_wineventlog<br>splunk_windows_sysmon_acc<br>splunk_cim_dmsplunk_windows
sigmaOriginal YAML/JSON Sigma Rule Outputsigma_defaultsigma_default
stixSTIX 2.0 & STIX Shifter Queriesstix_2_0<br>stix_shifterstix_2_0

Backend Output Formats

Backend OptionOutput Format OptionDescription
carbonblackdefault<br>jsonPlain CarbonBlack queries<br>CarbonBlack JSON query
cortexxdrdefault<br>jsonPlain CortexXDR queries<br>json output format
crowdstrike_splunkdefaultPlain SPL queries
crowdstrike_logscaledefaultCrowdStrike LogScale queries
elasticsearchdefault<br>kibana_ndjson<br>dsl_lucene<br>siem_rule<br>siem_rule_ndjsonPlain Elasticsearch Lucene queries<br>Kibana NDJSON import file with Lucene queries<br>Elasticsearch query DSL with embedded Lucene queries<br>Elasticsearch query DSL as SIEM Rules in JSON Format<br>Elasticsearch query DSL as SIEM Rules in NDJSON Format
insightidrdefault<br>leql_advanced_search<br>leql_detection_definitionSimple log search query mode<br>Advanced Log Entry Query Language (LEQL) queries<br>LEQL format roughly matching the 'Rule Logic' tab in ABA detection rule definition
lokidefault<br>rulerPlain Loki queries<br>Loki 'ruler' output format for generating alerts
microsoft_xdrdefaultKusto Query Language search strings
microsoft_sentinel_asimdefaultKusto Query Language search strings
microsoft_azure_monitordefaultKusto Query Language search strings
netwitnessdefaultPlain netwitness queries
opensearchdefault<br>dashboards_ndjson<br>monitor_rule<br>dsl_lucenePlain OpenSearch Lucene queries<br>OpenSearch Dashboards NDJSON import file with Lucene queries<br>OpenSearch monitor rule with embedded Lucene query<br>OpenSearch query DSL with embedded Lucene queries
qradardefaultPlain QRadar queries
secopsdefault<br>yara_lPlain UDM queries<br>YARA-L 2.0 Detection Rules Output Format
sentinelonedefault<br>jsonPlaintext<br>JSON format
splunkdefault<br>savedsearches<br>data_model<br>stanzaPlain SPL queries<br>Plain SPL in a savedsearches.conf file<br>Data model queries with tstats<br>Enterprise Security savedsearches.conf stanza
sigmadefault<br>yaml<br>jsonDefault output format<br>Default Sigma Rule output format<br>JSON style Sigma Rule Output
stixdefaultPlain stix queries

Pipelines

Available Named Pipelines

Pipeline OptionDescriptionDisplay Name
splunk_wineventlogSigmAIQ Custom combined windows_audit and splunk_windows pipelines to convert Sysmon fields to Windows Event Log fields for Splunk searchesSplunk WinEventLog
carbonblackUses Carbon Black EDR field mappingsCB
cortexxdrUses Palo Alto Cortex XDR field mappingsPalo Alto Cortex XDR
carbonblack_enterpriseUses Carbon Black Enterprise EDR field mappingsCB
crowdstrike_fdrCrowdstrike FDR Splunk MappingsCrowdStrike FDR SPL
crowdstrike_falconCrowdstrike Falcon Logscale MappingsCrowdStrike Falcon Logscale
ecs_kubernetesElastic Common Schema (ECS) Kubernetes audit log mappingsECS Kubernetes
ecs_windowsElastic Common Schema (ECS) Windows log mappings from Winlogbeat from version 7ECS Winlogbeat
ecs_windows_oldElastic Common Schema (ECS) Windows log mappings from Winlogbeat up to version 6ESC Winlogbeat (<= v6.x)
ecs_zeek_beatsElastic Common Schema (ECS) for Zeek using filebeat >= 7.6.1ECS Zeek (Elastic)
ecs_zeek_corelightElastic Common Schema (ECS) mapping from CorelightESC Zeek (Corelight)
zeek_rawZeek raw JSON field namingZeek Raw JSON
insightidrInsightIDR Log Entry Query Language (LEQL) TransformationsInsightIDR LEQL
loki_grafana_logfmtConverts field names to logfmt labels used by GrafanaLogfmt Labels
loki_promtail_sysmonParse and adjust field names for Windows sysmon data produced by promtailWinSysmon Promtail
loki_okta_system_logParse the Okta System Log event json, adjusting field-names appropriatelyOkta System Event
microsoft_xdrMappings for Sysmon -> XDR Advanced Hunting Query Table SchemaMicrosoft XDR KustoQL
sentinel_asimMappings for Sysmon -> Sentinel ASIM Query Table SchemaSentinel ASIM KustoQL
azure_monitorMappings for Sysmon -> Azure Monitor Query Table SchemaAzure Monitor KustoQL
netwitness_windowsNetwitness Windows log mappingsNetwitness Windows
qradar_fieldsSupports only the Sigma fields in the Field MappingSigma Fields
qradar_payloadUses UTF8(payload) instead of fields unsupported by the Field Mapping.UTF8(payload) (Non-Sigma Fields)
sigma_defaultEmpty ProcessingPipeline placeholderSigma
secops_udmMappings for Google SecOps (Chronicle) UDMGoogle SecOps UDM
sentineloneMappings for SentinelOne Deep Visibility QueriesSentinelOne Deep Visibility
splunk_windowsSplunk Query, Windows MappingsSplunk Query (Windows)
splunk_windows_sysmon_accSplunk Windows Sysmon search acceleration keywordsSplunk Query (Sysmon)
splunk_cim_dmSplunk Datamodel Field MappingsSplunk Datamodel Query
stix_2_0STIX 2.0 MappingsSTIX 2.0
stix_shifterSTIX Shifter MappingsSTIX Shifter
windows_sysmonSysmon for WindowsSysmon
windows_auditWindows Event LogsWindows Event Logs
windows_logsourceWindows Logs, GeneralWindows Logs, General

Contributing

We welcome contributions to SigmAIQ! Here's how you can contribute:

  1. Fork the repository
  2. Create a new branch for your feature or bug fix
  3. Make your changes and commit them with a clear commit message
  4. Push your changes to your fork
  5. Submit a pull request to the main repository

Please ensure your code adheres to the project's coding standards and includes appropriate tests.

License

This project is licensed under the LGPL License.

Maintainers

This library is currently maintained by: