StreamAlert

Overview

StreamAlert is a serverless, real-time data analysis framework which empowers you to ingest, analyze, and alert on data from any environment, using data sources and alerting logic you define. Computer security teams use StreamAlert to scan terabytes of log data every day for incident detection and response.

Incoming log data will be classified and processed by the rules engine. Alerts are then sent to one or more outputs.

StreamAlert High Level Architecture

(click to enlarge)

Features

  • Rules are written in Python; they can utilize any Python libraries or functions

  • Ingested logs and generated alerts can be retroactively searched for compliance and research

  • Serverless design is cheaper, easier to maintain, and scales to terabytes per day

  • Deployment is automated: simple, safe and repeatable for any AWS account

  • Secure by design: least-privilege execution, containerized analysis, and encrypted data storage

  • Merge similar alerts and automatically promote new rules if they are not too noisy

  • Built-in support for dozens of log types and schemas

  • Built-in collection of broadly applicable community rules

  • Fully open source and customizable: add your own log schemas, rules, and alert outputs

Ready? Let’s get started!

Table of Contents

Getting Started

It only takes a few minutes to get StreamAlert up and running! These instructions have been tested on MacOS, but should also work on most linux systems.

Install Dependencies

  1. Install Python 3.7 and pip

  2. Install Terraform >= v0.13.0:

brew install terraform  # MacOS Homebrew
terraform --version     # Must be >= v0.13.0

Note

Terraform versions lower than 0.13 are not supported. We recommend installing Terraform version 0.13.0 or greater.

  1. If you are using Linux, you may need to install the Python development libraries:

sudo apt install python-dev    # Debian
sudo yum install python-devel  # CentOS/RHEL

Download StreamAlert

  1. Clone the latest stable release of StreamAlert:

git clone --branch stable https://github.com/airbnb/streamalert.git
  1. Create and activate a virtual environment:

cd streamalert
python3.7 -m venv venv
source venv/bin/activate
  1. Install the StreamAlert requirements:

pip install -r requirements.txt
  1. Run unit tests to make sure everything is installed correctly:

tests/scripts/unit_tests.sh

Configure AWS Credentials

1. Create an AWS account and an IAM user with permissions for at least the following services:

  • Athena

  • CloudTrail

  • CloudWatch Events and Logs

  • DynamoDB

  • Glue

  • IAM

  • Kinesis Firehose and Streams

  • KMS

  • Lambda

  • S3

  • SNS

  • SQS

  1. Configure your AWS credentials

pip install --user awscli
aws configure

Deploy

Note

StreamAlert supports Terraform’s native ability to lock the remote s3 state file whenever a user is planning and applying Terraform configuration. This is to prevent multiple users from deploying StreamAlert at the same time potentially resulting in a broken state. StreamAlert will automatically create and destroy this table via the command line interface. See Terraform’s documentation for more information.

  1. Set basic StreamAlert configuration options:

python manage.py configure aws_account_id 111111111111  # Replace with your 12-digit AWS account ID
python manage.py configure prefix <value>               # Choose a unique name prefix (alphanumeric characters only)

.. code-block:: bash

  "athena_partitioner_config": {
    "concurrency_limit": 10,
    "file_format": "parquet",
    "log_level": "info"
  }

* More information can be found on the `historical search <historical-search.html>`_ page.
  1. Build the StreamAlert infrastructure for the first time:

python manage.py init

There will be multiple Terraform prompts, type “yes” at each one to continue.

Note

You only need to python manage.py init once for any given StreamAlert deployment, although it is safe to run again if necessary.

3. At this point, StreamAlert is up and running! You can, for example, see the S3 buckets that were automatically created:

aws s3 ls | grep streamalert

You can also login to the AWS web console and see StreamAlert’s CloudWatch logs, Lambda functions, etc.

Live Test

Now let’s upload some data and trigger an alert to see StreamAlert in action! This example uses SNS for both sending the log data and receiving the alert, but StreamAlert also supports many other data sources and alert outputs.

  1. Create 2 SNS topics:

aws sns create-topic --name streamalert-test-data
aws sns create-topic --name streamalert-test-alerts
  1. Export some environment variables for easy re-use later:

export SA_REGION=us-east-1        # StreamAlert deployment region
export SA_ACCOUNT=111111111111    # AWS account ID
export SA_EMAIL=email@domain.com  # Email to receive an SNS notification
  1. Subscribe your email to the alerts SNS topic:

aws sns subscribe --topic-arn arn:aws:sns:$SA_REGION:$SA_ACCOUNT:streamalert-test-alerts \
    --protocol email --notification-endpoint $SA_EMAIL

Note

You will need to click the verification link in your email to activate the subscription.

4. Add the streamalert-test-data SNS topic as an input to the (default) prod cluster. Open conf/clusters/prod.json and change the streamalert module to look like this:

{
  "classifier_config": {
    "enable_custom_metrics": true,
    "inputs": {
      "aws-sns": [
        "arn:aws:sns:REGION:ACCOUNTID:streamalert-test-data"
      ]
    },
    "log_level": "info",
    "log_retention_days": 14,
    "memory": 128,
    "timeout": 60
  }
}

5. Tell StreamAlert which log schemas will be sent to this input. Open conf/clusters/prod.json and change the data_sources section to look like this:

{
  "data_sources": {
    "sns": {
      "streamalert-test-data": [
        "cloudwatch"
      ]
    }
  }
}
  1. Add the alert topic as a StreamAlert output:

python manage.py output set aws-sns

Please supply a short and unique descriptor for this SNS topic: test-email

Please supply SNS topic name: streamalert-test-alerts

If you look at conf/outputs.json, you’ll notice that the SNS topic was automatically added.

7. Configure a rule to send to the alerts topic. We will use rules/community/cloudwatch_events/cloudtrail_root_account_usage.py as an example, which alerts on any usage of the root AWS account. Change the rule decorator to:

@rule(
    logs=['cloudwatch:events'],
    req_subkeys={'detail': ['userIdentity', 'eventType']},
    outputs=['aws-sns:test-email']  # Add this line
)
def cloudtrail_root_account_usage(rec):
  1. Now we need to update StreamAlert with these changes:

# Hook the streamalert-test-data SNS topic up to the StreamAlert Classifier function
python manage.py build

# Deploy a new version of all of the Lambda functions with the updated rule and config files
python manage.py deploy

Note

Use build and deploy to apply any changes to StreamAlert’s configuration or Lambda functions, respectively. Some changes (like this example) require both.

  1. Time to test! Create a file named cloudtrail-root.json with the following contents:

{
  "account": "1234",
  "detail": {
    "eventType": "AwsConsoleSignIn",
    "userIdentity": {
      "type": "Root"
    }
  },
  "detail-type": "CloudTrail Test",
  "id": "1234",
  "region": "us-east-1",
  "resources": [],
  "source": "1.1.1.2",
  "time": "now",
  "version": "2018"
}

This is only a rough approximation of what the real log might look like, but good enough for our purposes. Then send it off to the data SNS topic:

aws sns publish --topic-arn arn:aws:sns:$SA_REGION:$SA_ACCOUNT:streamalert-test-data \
    --message "$(cat cloudtrail-root.json)"

If all goes well, an alert should arrive in your inbox within a few minutes! If not, look for any errors in the CloudWatch Logs for the StreamAlert Lambda functions.

10. After 10 minutes (the default refresh interval), the alert will also be searchable from Amazon Athena. Select your StreamAlert database in the dropdown on the left and preview the alerts table:

Query Alerts Table in Athena

(Here, my name prefix is testv2.) If no records are returned, look for errors in the Athena Partitioner function or try invoking it directly.

And there you have it! Ingested log data is parsed, classified, and scanned by the rules engine. Any resulting alerts are delivered to your configured output(s) within a matter of minutes.

Architecture

StreamAlert consists of multiple AWS components connected together and managed by Terraform.

StreamAlert Architecture

(click to enlarge)

Data Lifecycle

1. Log data can come through any of the supported data sources. This includes Kinesis, S3, SNS, or using a StreamAlert App to periodically poll data from a third-party API.

2. Inbound logs are first ingested via the “classifier” Lambda function in one of your clusters. The classifier is the first and most substantial component of StreamAlert, responsible for parsing, classifying, and performing normalization on logs. The classifier(s) can also optionally forward the resulting logs from classification to Firehose for historical retention and searching via Athena. The same results also get sent to an SQS Queue for further downstream analysis.

3. The SQS Queue that the classifier function(s) send data to is then utilized by a “rules engine” Lambda function. Note that this function is not “clustered” like the classifier(s) and is global to the StreamAlert deployment. The data read from the queue will be ran through the defined rules, and any alerts that are triggered will be sent to a DynamoDB table. Optionally, the rules engine function can read Threat Intelligence information from DynamoDB, or perform other lookup-style operations using data stored in S3.

4. The “alert merger” Lambda function regularly scans the alerts DynamoDB table. When new alerts arrive, they are either forwarded immediately (by default) or, if merge options are specified, they are bundled together with similar alerts before proceeding to the next stage.

5. The “alert processor” Lambda function is responsible for actually delivering the alert to its configured outputs. All alerts implicitly include a Firehose output, which feeds an S3 bucket that can be queried with Athena. Alerts will be retried indefinitely until they are successfully delivered, at which point they will be removed from the DynamoDB table.

6. An Athena Partitioner Lambda function runs periodically to onboard new StreamAlert data and alerts into their respective Athena databases for historical search.

Other StreamAlert components include DynamoDB tables and Lambda functions for optional rule promotion and regularly updating threat intelligence information.

Datatypes

StreamAlert supports the following datatypes:

JSON

{"type": "json"}

CSV

csv,data,example

Key-Value

type=kv data=example

Syslog

Jun 15 00:00:40 host1.mydomain.io application[pid] syslog message.

Datasources

StreamAlert supports the following services as datasources:

  • Amazon S3

  • Amazon Kinesis Data Streams

  • Amazon Simple Notification Service (SNS)

These services above can accept data from:

  • Log Forwarding Agents

  • Custom Applications

  • AWS CloudTrail

  • Amazon CloudWatch Events

  • And more

To configure datasources for a cluster, read datasource configuration

Amazon S3

StreamAlert supports data analysis and alerting for logs written to Amazon S3 buckets. This is achieved via Amazon S3 Event Notifications from an event type of s3:ObjectCreated:*.

Example AWS use-cases:

  • AWS Config logs

  • S3 Server Access logs

Example non-AWS use-cases:

  • Host logs (syslog, auditd, osquery, …)

  • Network logs (Palo Alto Networks, Cisco, …)

  • Web Application logs (Apache, nginx, …)

  • SaaS logs (Box, GSuite, OneLogin, …)

Amazon Kinesis Data Streams

StreamAlert also utilizes Amazon Kinesis Data Streams for real-time data ingestion and analysis. By default, StreamAlert creates an Amazon Kinesis Data Stream per cluster.

Sending to Amazon Kinesis Data Streams
Log Forwarding Agents

Log forwarding agents that support Amazon Kinesis Data Streams:

Code/Applications

Code can send data to an Amazon Kinesis Data Stream via:

Amazon SNS

Amazon Simple Notification Service (SNS) is a flexible, fully managed pub/sub messaging notification service for coordinating the delivery of messages to subscribing endpoints and clients.

StreamAlert can utilize SNS as an input for processing.

Use-cases:

  • Receiving messages from other AWS services

Global Settings

Settings that apply globally for StreamAlert are stored in the conf/global.json file. This file has a few different sections to help organize the settings by purpose. These sections are described in further detail below.

Account

The account section of conf/global.json file is used to store information specifically related to the AWS account used for your StreamAlert deployment.

Configuration
{
  "account": {
    "aws_account_id": "123456789012",
    "prefix": "<prefix>",
    "region": "us-east-1"
  }
}
Options

Key

Required

Default

Description

aws_account_id

Yes

None

12 digit account ID for your AWS account

prefix

Yes

None

An alphanumeric and unique prefix to be used for your deployment

region

Yes

us-east-1

AWS region within which you would like to deploy StreamAlert

Tip

The aws_account_id and prefix settings can be set using the CLI:

python manage.py configure aws_account_id 111111111111  # Replace with your 12-digit AWS account ID
python manage.py configure prefix <value>               # Choose a unique name prefix (alphanumeric characters only)

However, if a different region is desired, it must be changed manually.

General

The general section of conf/global.json file is used to store general information related to your StreamAlert deployment. Notably, paths to rules and matchers can be supplied here.

Configuration
{
  "general": {
    "terraform_files": [
      "/absolute/path/to/extra/terraform/file.tf"
    ],
    "matcher_locations": [
      "matchers"
    ],
    "rule_locations": [
      "rules"
    ],
    "scheduled_query_locations": [
      "scheduled_queries"
    ],
    "publisher_locations": [
      "publishers"
    ],
    "third_party_libraries": [
      "pathlib2==2.3.5"
    ]
  }
}
Options

Key

Required

Default

Description

matcher_locations

Yes

["matchers"]

List of local paths where matchers are defined

rule_locations

Yes

["rules"]

List of local paths where rules are defined

scheduled_query_locations

Yes

["scheduled_queries"]

List of local paths where scheduled_queries are defined

publisher_locations

Yes

["publishers"]

List of local paths where publishers are defined

third_party_libraries

No

["pathlib2==2.3.5"]

List of third party dependencies that should be installed via pip at deployment time. These are libraries needed in rules, custom code, etc that are defined in one of the above settings.

terraform_files

No

[]

List of local paths to Terraform files that should be included as part of this StreamAlert deployment

Infrastructure

The infrastructure section of conf/global.json file is used to store information related to settings for various global resources/infrastructure components needed by StreamAlert. There are various subsections within this section, each of which is outlined below.

Alerts Firehose

By default, StreamAlert will send all alert payloads to S3 for historical retention and searching. These payloads include the original record data that triggered the alert, as well as the rule that was triggered, the source of the log, the date/time the alert was triggered, the cluster from which the log came, and a variety of other fields.

Configuration

The following alerts_firehose configuration settings can be defined within the infrastructure section of global.json:

{
  "infrastructure": {
    "alerts_firehose": {
      "bucket_name": "<prefix>-streamalerts",
      "buffer_size": 64,
      "buffer_interval": 300,
      "cloudwatch_log_retention": 14
    }
  }
}
Options

Key

Required

Default

Description

bucket_name

No

<prefix>-streamalerts

Bucket name to override the default name

buffer_size

No

64 (MB)

Buffer incoming data to the specified size, in megabytes, before delivering it to S3

buffer_interval

No

300 (seconds)

Buffer incoming data for the specified period of time, in seconds, before delivering it to S3

cloudwatch_log_retention

No

14 (days)

Days for which to retain error logs that are sent to CloudWatch in relation to this Kinesis Firehose Delivery Stream

Alerts Table

StreamAlert utilizes a DynamoDB Table as a temporary storage mechanism when alerts are triggered from the Rules Engine. This table can be configured as necessary to scale to the throughput of your alerts.

Configuration

The following alerts_table configuration settings can be defined within the infrastructure section of global.json:

{
  "infrastructure": {
    "alerts_table": {
      "read_capacity": 10,
      "write_capacity": 10
    }
  }
}
Options

Key

Required

Default

Description

read_capacity

No

5

Read capacity value to apply to the alerts DynamoDB Table

write_capacity

No

5

Write capacity value to apply to the alerts DynamoDB Table

Classifier SQS

StreamAlert sends all classified logs to an SQS Queue. This queue is then read from by the Rules Engine function to perform rule analysis.

Configuration

Note

These configuration options are only available for legacy purposes and may be removed in a future release. They will typically only be needed if manually migrating from v2 to v3+.

The following classifier_sqs configuration settings can be defined within the infrastructure section of global.json:

{
  "infrastructure": {
    "classifier_sqs": {
      "use_prefix": true
    }
  }
}
Options

Key

Required

Default

Description

use_prefix

No

true

Whether the prefix should be prepended to the classified logs SQS Queue that is created (set to false for legacy purposes only)

Firehose (Historical Data Retention)

StreamAlert also supports sending all logs to S3 for historical retention and searching based on classified type of the log. Kinesis Data Firehose Delivery Streams are used to send the data to S3.

Configuration

The following firehose configuration settings can be defined within the infrastructure section of global.json:

{
  "infrastructure": {
    "firehose": {
      "enabled": true,
      "bucket_name": "<prefix>-streamalert-data",
      "buffer_size": 64,
      "buffer_interval": 300,
      "enabled_logs": {
        "osquery": {
          "enable_alarm": true
        },
        "cloudwatch:cloudtrail": {},
        "ghe": {
          "enable_alarm": true,
          "evaluation_periods": 10,
          "period_seconds": 3600,
          "log_min_count_threshold": 100000
        }
      }
    }
  }
}
Options

Key

Required

Default

Description

enabled

Yes

None

If set to false, this will disable the creation of any Kinesis Firehose resources and indicate to the Classifier functions that they should not send data for retention

use_prefix

No

true

Whether the prefix should be prepended to Firehoses that are created (only to be used for legacy purposes)

bucket_name

No

<prefix>-streamalert-data

Bucket name to override the default name

buffer_size

No

64 (MB)

Buffer incoming data to the specified size, in megabytes, before delivering it to S3

buffer_interval

No

300 (seconds)

Buffer incoming data for the specified period of time, in seconds, before delivering it to S3

enabled_logs

No

{}

Which classified log types to send to Kinesis Firehose from the Classifier function, along with specific settings per log type

Note

The enabled_logs object should contain log types for which Firehoses should be created. The keys in the ‘dictionary’ should reference the log type (or subtype) for which Firehoses should be created, and the value should be additional (optional) settings per log type. The following section contains more detail on these settings.

Configuring enabled_logs

The enabled_logs section of the firehose settings must explicitly specify the log types for which you would like to enable historical retention. There are two syntaxes you may use to specify log types:

  1. parent log type: osquery

  2. log subtype: osquery:differential

The former will create Firehose resources for all osquery subtypes, while the latter will only create one Firehose for specifically the osquery:differential subtype.

Since each Firehose that gets created can have additional settings applied to it, the proper way to simply enable given log types is to add items to enabled_logs as follows (note the empty JSON object as the value):

{
  "infrastructure": {
    "firehose": {
      "enabled_logs": {
        "osquery": {},
        "cloudwatch:cloudtrail": {}
      }
    }
  }
}

Each Firehose that is created can be configured with an alarm that will fire when the incoming log volume drops below a specified threshold. This is disabled by default, and can be enabled by setting enable_alarm to true within the configuration for the log type.

Key

Required

Default

Description

enable_alarm

No

false

If set to true, a CloudWatch Metric Alarm will be created for this log type

evaluation_periods

No

1

Consecutive periods the records count threshold must be breached before triggering an alarm

period_seconds

No

86400

Period over which to count the IncomingRecords (default: 86400 seconds [1 day])

log_min_count_threshold

No

1000

Alarm if IncomingRecords count drops below this value in the specified period(s)

alarm_actions

No

<prefix>_streamalert_monitoring SNS topic

Optional CloudWatch alarm action or list of CloudWatch alarm actions (e.g. SNS topic ARNs)

Note

See the ghe log type in the example firehose configuration above for how this can be performed.

Additional Info

When adding a log type to the enable_logs configuration, a dedicated Firehose is created for each of the log subtypes.

For instance, suppose the following schemas are defined across one or more files in the conf/schemas directory:

{
  "cloudwatch:events": {
    "parser": "json",
    "schema": {"key": "type"}
  },
  "cloudwatch:cloudtrail": {
    "parser": "json",
    "schema": {"key": "type"}
  },
  "osquery:differential": {
    "parser": "json",
    "schema": {"key": "type"}
  },
  "osquery:status": {
    "parser": "json",
    "schema": {"key": "type"}
  }
}

Supposing also that the above enabled_logs example is used, the following Firehose resources will be created:

  • <prefix>_streamalert_cloudwatch_cloudtrail

  • <prefix>_streamalert_osquery_differential

  • <prefix>_streamalert_osquery_status

Note

Notice that there is no Firehose created for the cloudwatch:events log type. This is because this log type was not included in the enabled_logs configuration, and only the cloudwatch:cloudtrail subtype of cloudwatch was included.

Each Delivery Stream delivers data to the same S3 bucket created by the module in a prefix based on the corresponding log type:

  • arn:aws:s3:::<prefix>-streamalert-data/cloudwatch_cloudtrail/YYYY/MM/DD/data_here

  • arn:aws:s3:::<prefix>-streamalert-data/osquery_differential/YYYY/MM/DD/data_here

  • arn:aws:s3:::<prefix>-streamalert-data/osquery_status/YYYY/MM/DD/data_here

Limits

Depending on your log volume, you may need to request limit increases for Firehose. * Kinesis Firehose Limits * Kinesis Firehose Delivery Settings

Monitoring

StreamAlert can send notifications of issues with infrastructure to an SNS topic (aka “monitoring” the health of your infrastructure).

Configuration

The following monitoring configuration settings can be defined within the infrastructure section of global.json:

{
  "infrastructure": {
    "monitoring": {
      "sns_topic_name": "name-of-existing-sns-topic-to-use"
    }
  }
}
Options

Key

Required

Default

Description

sns_topic_name

No

<prefix>_streamalert_monitoring

Name of an existing SNS Topic to which monitoring information should be sent instead of the default one that will be created

Rule Staging

StreamAlert comes with the ability to stage rules that have not been battle tested. This feature is backed by a DynamoDB table, for which there are a few configurable options.

Configuration
{
  "infrastructure": {
    "rule_staging": {
      "cache_refresh_minutes": 10,
      "enabled": true,
      "table_read_capacity": 5,
      "table_write_capacity": 5
    }
  }
}
Options

Key

Required

Default

Description

enabled

No

false

Should be set to true to enable the rule staging feature

cache_refresh_minutes

No

10

Maximum amount of time (in minutes) the Rules Engine function should wait to force refresh the rule staging information.

table_read_capacity

No

5

DynamoDB read capacity to allocate to the table that stores staging information. The default setting should be sufficient in most use cases.

table_write_capacity

No

5

DynamoDB write capacity to allocate to the table that stores staging information. The default setting should be sufficient in most use cases.

Tip

By default, the rule staging feature is not enabled. It can be enabled with the following command:

python manage.py rule-staging enable --true
S3 Access Logging

StreamAlert will send S3 Server Access logs generated by all the buckets in your deployment to a logging bucket that will be created by default. However, if you have an existing bucket where you are already centralizing these logs, the name may be provided for use by StreamAlert’s buckets.

Configuration

The following s3_access_logging configuration settings can be defined within the infrastructure section of global.json:

{
  "infrastructure": {
    "s3_access_logging": {
      "bucket_name": "name-of-existing-bucket-to-use"
    }
  }
}
Options

Key

Required

Default

Description

bucket_name

No

<prefix>-streamalert-s3-logging

Name of existing S3 bucket to use for logging instead of the default bucket that will be created

Terraform

StreamAlert uses Terraform for maintaining its infrastructure as code and Terraform will utilize a remote state that is stored on S3. By default, we will create a bucket for use by Terraform, but a bucket name can also be supplied to use instead. The terraform section of conf/global.json file should be used to store these settings.

Configuration
{
  "terraform": {
    "bucket_name": "<prefix>-streamalert-terraform-state",
    "state_key_name": "streamalert_state/terraform.tfstate"
  }
}
Options

Key

Required

Default

Description

bucket_name

No

<prefix>-streamalert-terraform-state

Name of existing S3 bucket to use for the Terraform remote state instead of the default bucket that will be created

state_key_name

No

streamalert_state/terraform.tfstate

Name to use as the key of the Terraform state object in S3

Cluster Settings

Inbound data is directed to one of StreamAlert’s clusters, each with its own data sources and classifier function. For many applications, one cluster may be enough. However, adding additional clusters can potentially improve performance. For example, you could have:

  • A cluster dedicated to StreamAlert apps

  • A separate cluster for each of your inbound Kinesis Data Streams

  • A separate cluster for data from each environment (prod, staging, corp, etc)

Note

Alerting and historical search components are global and not tied to a specific cluster, although alerts will indicate their originating cluster.

Each cluster is defined by its own JSON file in the conf/clusters directory. To add a new cluster, simply create a new JSON file with the cluster name and fill in your desired configuration, described below.

Changes to cluster configuration can be applied with one of the following:

python manage.py build  # Apply all changes
python manage.py build --target cloudwatch_monitoring_*  # Only apply changes to CloudWatch Monitoring module for all clusters

Required Settings

There are a few top-level settings that are required within each cluster configuration file. Notably, these settings configure the data_sources and classifier_config.

Datasource Configuration

A cluster’s classifier function knows what types of data it should parse based on defined data_sources within each cluster configuration file in the conf/clusters directory.

For background on supported data source types, read Datasources.

Note

As of release 3.0.0 data source configuration has moved from sources.json into the data_sources top level key for each your clusters.

Each data source (kinesis, s3, or sns) contains a mapping of specific resource names (Kinesis stream names, S3 bucket names, etc) along with a list of log types that should be expected from that source.

Note

Log schemas are defined in one or more files in the conf/schemas directory. See the Schemas page for more information, or the Example Schemas page for some sample log definitions.

Each log in the list of logs instructs StreamAlert’s classifier function to attempt to parse incoming data from the given resource as that log type. The classifier will only try to parse incoming data as types that are defined for each resource. If new types of data are fed into one of the sources defined here, but this configuration is not updated, it will result in failed parses in the classifier function.

Example
{
  "data_sources": {
    "kinesis": {
      "abc_corporate_streamalert": [
        "cloudwatch",
        "cloudtrail"
      ],
      "abc_production_stream_streamalert": [
        "osquery"
      ]
    },
    "s3": {
      "abc.ghe.logs": [
        "ghe"
      ],
      "abc.hids.logs": [
        "carbonblack"
      ]
    },
    "sns": {
      "abc_sns_topic": [
        "fleet"
      ]
    }
  }
}

Important

Any data source log type that is listed must have an associated log definition within your schemas definitions.

Classifier Configuration

A cluster’s classifier function is a required component and configurable with a few settings that may be tweaked to help with performance or cost. These exist as part of the classifier_config block within each cluster configuration file in the conf/clusters directory.

Example: Basic Cluster
{
  "id": "minimal-cluster",
  "classifier_config": {
    "enable_custom_metrics": true,
    "log_level": "info",
    "log_retention_days": 14,
    "memory": 128,
    "timeout": 60
  },
  "data_sources": {
    "kinesis": {
      "abc_corporate_streamalert": [
        "cloudwatch"
      ]
    }
  }
}
Example: Classifier with SNS Inputs
{
  "id": "sns-inputs",
  "classifier_config": {
    "enable_custom_metrics": true,
    "inputs": {
      "aws-sns": [
        "arn:aws:sns:REGION:ACCOUNT:TOPIC_NAME"
      ]
    },
    "log_level": "info",
    "log_retention_days": 14,
    "memory": 128,
    "timeout": 60
  },
  "data_sources": {
    "sns": {
      "TOPIC_NAME": [
        "cloudwatch"
      ]
    }
  }
}
Options

Key

Default

Description

enable_custom_metrics

true

Enable custom metrics for the cluster

enable_threat_intel

false

Toggle threat intel integration (beta)

inputs

{}

SNS topics which can invoke the classifier function (see example)

log_level

"info"

Lambda CloudWatch logging level

memory

Lambda function memory (MB)

timeout

Lambda function timeout (seconds)

Modules

Optional configuration settings are divided into different modules, which should be defined within the modules section of each cluster configuration file in the conf/clusters directory.

CloudTrail

StreamAlert has native support for enabling and monitoring AWS CloudTrail.

This module is implemented by terraform/modules/tf_cloudtrail.

Example: CloudTrail via S3 Events
{
  "id": "cloudtrail-s3-events",
  "classifier_config": {
    "enable_custom_metrics": true,
    "log_level": "info",
    "log_retention_days": 14,
    "memory": 128,
    "timeout": 60
  },
  "data_sources": {
    "s3": {
      "abc-prod-streamalert-cloudtrail": [
        "cloudtrail"
      ]
    }
  },
  "modules": {
    "cloudtrail": {
      "s3_settings": {
        "enable_events": true
      }
    }
  }
}

This creates a new CloudTrail and an S3 bucket for the resulting logs. Each new object in the bucket will invoke the StreamAlert classifier function via S3 events. For this data, rules should be written against the cloudtrail:events log type.

Example: CloudTrail via CloudWatch Logs
{
  "id": "cloudtrail-via-cloudwatch",
  "classifier_config": {
    "enable_custom_metrics": true,
    "log_level": "info",
    "log_retention_days": 14,
    "memory": 128,
    "timeout": 60
  },
  "data_sources": {
    "kinesis": {
      "abc_prod_streamalert": [
        "cloudwatch"
      ]
    }
  },
  "modules": {
    "cloudtrail": {
      "s3_settings": {
        "enable_events": true
      },
      "send_to_cloudwatch": true
    },
    "kinesis": {
      "streams": {
        "retention": 96,
        "shards": 10
      }
    },
    "kinesis_events": {
      "batch_size": 10,
      "enabled": true
    }
  }
}

This also creates the CloudTrail and S3 bucket, but now the CloudTrail logs are also delivered to CloudWatch Logs Group that forwards them to a Kinesis stream via a CloudWatch Logs Subscription Filter. This can scale to higher throughput, since StreamAlert does not have to download potentially very large files from S3. In this case, rules should be written against the cloudwatch:cloudtrail log type.

Options

Key

Default

Description

enabled

true

Toggle the cloudtrail module

enable_logging

true

Toggle to false to pause logging to the CloudTrail

exclude_home_region_events

false

Ignore events from the StreamAlert deployment region. This only has an effect if send_to_cloudwatch is set to true

is_global_trail

true

If true, the CloudTrail is applied to all regions

send_to_cloudwatch

false

Enable CloudTrail delivery to CloudWatch Logs. Logs sent to CloudWatch Logs are forwarded to this cluster’s Kinesis stream for processing. If this is enabled, the enable_s3_events option should be disabled to avoid duplicative processing.

cloudwatch_destination_arn

(Computed from CloudWatch Logs Destination module)

CloudWatch Destination ARN used for forwarding data to this cluster’s Kinesis stream. This has a default value but can be overridden here with a different CloudWatch Logs Destination ARN

send_to_sns

false

Create an SNS topic to which notifications should be sent when CloudTrail puts a new object in the S3 bucket. The topic name will be the same as the S3 bucket name

allow_cross_account_sns

false

Allow account IDs specified in the cross_account_ids array within the s3_settings (see below) to also send SNS notifications to the created SNS Topic

s3_settings

None

Configuration options for CloudTrail related to S3. See the S3 Options section below for details.

S3 Options

The cloudtrail module has a subsection of s3_settings, which contains options related to S3.

Key

Default

Description

cross_account_ids

[]

Grant write access to the CloudTrail S3 bucket for these account IDs. The primary, aka deployment account ID, will be added to this list.

enable_events

false

Enable S3 events for the logs sent to the S3 bucket. These will invoke this cluster’s classifier for every new object in the CloudTrail S3 bucket

ignore_digest

true

If enable_events is enabled, setting ignore_digest to false will also process S3 files that are created within the AWSLogs/<account-id>/CloudTrail-Digest. Defaults to true.

bucket_name

prefix-cluster-streamalert-cloudtrail

Name of the S3 bucket to be used for the CloudTrail logs. This can be overridden, but defaults to prefix-cluster-streamalert-cloudtrail

event_selector_type

""

An S3 event selector to enable object level logging for the account’s S3 buckets. Choices are: “ReadOnly”, “WriteOnly”, “All”, or “”, where “” disables object level logging for S3

CloudWatch Events

StreamAlert supports ingestion of events published to CloudWatch Events for processing.

This module is implemented by terraform/modules/tf_cloudwatch_events.

Note

The Kinesis module must also be enabled.

Example
{
  "id": "cloudwatch-events-example",
  "classifier_config": {
    "enable_custom_metrics": true,
    "log_level": "info",
    "log_retention_days": 14,
    "memory": 128,
    "timeout": 60
  },
  "data_sources": {
    "kinesis": {
      "abc_prod_streamalert": [
        "cloudwatch"
      ]
    }
  },
  "modules": {
    "cloudwatch_events": {
      "event_pattern": {
        "account": [
          "123456789012"
        ],
        "detail-type": [
          "EC2 Instance Launch Successful",
          "EC2 Instance Launch Unsuccessful",
          "EC2 Instance Terminate Successful",
          "EC2 Instance Terminate Unsuccessful"
        ]
      },
      "cross_account": {
        "accounts": {
          "123456789012": [
            "us-east-1"
          ]
        },
        "organizations": {
          "o-aabbccddee": [
            "us-east-1"
          ]
        }
      }
    },
    "kinesis": {
      "streams": {
        "retention": 96,
        "shards": 10
      }
    },
    "kinesis_events": {
      "batch_size": 100,
      "enabled": true
    }
  }
}

This creates a CloudWatch Events Rule that will publish all events that match the provided event_pattern to the Kinesis Stream for this cluster. Note in the example above that a custom event_pattern is supplied, but may be omitted entirely. To override the default event_patten (shown below), a value of None or {} may also be supplied to capture all events, regardless of which account the logs came from. In this case, rules should be written against the cloudwatch:events log type.

Options

Key

Default

Description

event_pattern

{"account": ["<account-id>"]}

The CloudWatch Events pattern to control what is sent to Kinesis

cross_account

None

Configuration options to enable cross account access for specific AWS Accounts and Organizations. See the Cross Account Options section below for details.

Cross Account Options

The cross_account section of the cloudwatch_events module has two subsections, outlined here. Usage of these is also shown in the example above.

Key

Default

Description

accounts

None

A mapping of account IDs and regions for which cross account access should be enabled. Example: {"123456789012": ["us-east-1"], "234567890123": ["us-west-2"]}

organizations

None

A mapping of organization IDs and regions for which cross account access should be enabled. Example: {"o-aabbccddee": ["us-west-2"]}

CloudWatch Logs

StreamAlert makes it easy to ingest CloudWatch Logs from any AWS account. A common use case is to ingest and scan CloudTrail from multiple AWS accounts (delivered via CloudWatch Logs), but you could also ingest any application logs delivered to CloudWatch.

Note

The Kinesis module must also be enabled.

This module is implemented by terraform/modules/tf_cloudwatch_logs_destination.

Example
{
  "id": "cloudwatch-logs-example",
  "classifier_config": {
    "enable_custom_metrics": true,
    "log_level": "info",
    "log_retention_days": 14,
    "memory": 128,
    "timeout": 60
  },
  "data_sources": {
    "kinesis": {
      "abc_prod_streamalert": [
        "cloudwatch"
      ]
    }
  },
  "modules": {
    "cloudwatch_logs_destination": {
      "cross_account_ids": [
        "111111111111"
      ],
      "enabled": true,
      "regions": [
        "ap-northeast-1",
        "ap-northeast-2",
        "ap-southeast-2"
      ]
    },
    "kinesis": {
      "streams": {
        "retention": 96,
        "shards": 10
      }
    },
    "kinesis_events": {
      "batch_size": 100,
      "enabled": true
    }
  }
}

This creates an IAM role for CloudWatch subscriptions, authorized to gather logs from the StreamAlert account as well as account 111111111111, in all regions except Asia-Pacific.

Once you have applied this change to enable StreamAlert to subscribe to CloudWatch logs, you need to create a subscription filter in the producer account to actually deliver the logs, optionally with Terraform. The CloudWatch logs destination ARN will be arn:aws:logs:REGION:STREAMALERT_ACCOUNT:destination:streamalert_CLUSTER_cloudwatch_to_kinesis.

Options

Key

Default

Description

enabled

true

Toggle the cloudwatch_logs_destination module

cross_account_ids

[]

Authorize StreamAlert to gather logs from these accounts

excluded_regions

[]

Do not create CloudWatch Log destinations in these regions

CloudWatch Monitoring

To ensure data collection is running smoothly, we recommend enabling CloudWatch metric alarms to monitor the health the classifier Lambda function(s) and, if applicable, the respective Kinesis stream.

This module is implemented by terraform/modules/tf_monitoring.

Example
{
  "id": "cloudwatch-monitoring-example",
  "classifier_config": {
    "enable_custom_metrics": true,
    "log_level": "info",
    "log_retention_days": 14,
    "memory": 128,
    "timeout": 60
  },
  "data_sources": {
    "kinesis": {
      "abc_prod_streamalert": [
        "cloudwatch"
      ]
    }
  },
  "modules": {
    "cloudwatch_monitoring": {
      "enabled": true,
      "kinesis_alarms_enabled": true,
      "lambda_alarms_enabled": true,
      "settings": {
        "lambda_invocation_error_threshold": 0,
        "lambda_throttle_error_threshold": 0,
        "kinesis_iterator_age_error_threshold": 1000000,
        "kinesis_write_throughput_exceeded_threshold": 10
      }
    }
  }
}

This enables both the Kinesis and Lambda alarms and illustrates how the alarm thresholds can be tuned. A total of 5 alarms will be created:

  • Classifier Lambda function invocation errors

  • Classifier Lambda function throttles

  • Classifier Lambda function iterator age, applicable only for Kinesis invocations

  • Kinesis iterator age

  • Kinesis write exceeded

Options

Key

Default

Description

enabled

false

Toggle the cloudwatch_monitoring module

kinesis_alarms_enabled

true

Toggle the Kinesis-specific metric alarms

lambda_alarms_enabled

true

Toggle the Lambda-specific metric alarms

settings

{}

Alarm-specific settings (see below)

There are three settings for a CloudWatch alarm:

  • Period is the length of time to evaluate the metric

  • Evaluation Periods is the number of periods over which to evaluate the metric

  • Threshold is the upper or lower bound after which the alarm will trigger

The following options are available in the settings dictionary:

Key

Default

lambda_invocation_error_threshold

0

lambda_invocation_error_evaluation_periods

1

lambda_invocation_error_period

300

lambda_throttle_error_threshold

0

lambda_throttle_error_evaluation_periods

1

lambda_throttle_error_period

300

lambda_iterator_age_error_threshold

1000000

lambda_iterator_age_error_evaluation_periods

1

lambda_iterator_age_error_period

300

kinesis_iterator_age_error_threshold

1000000

kinesis_iterator_age_error_evaluation_periods

1

kinesis_iterator_age_error_period

300

kinesis_write_throughput_exceeded_threshold

10

kinesis_write_throughput_exceeded_evaluation_periods

6

kinesis_write_throughput_exceeded_period

300

Receiving CloudWatch Metric Alarms

By default, StreamAlert automatically creates a <prefix>_streamalert_monitoring SNS topic that receives CloudWatch metric alarm notifications. If you would instead like to use an existing SNS topic for metric alarms, see the Monitoring configuration settings for how to set this up.

In either case, to receive notifications for metric alarms, simply subscribe to the SNS topic.

Kinesis (Data Streams)

This module creates a Kinesis Data Stream in the cluster, which is the most common approach for StreamAlert data ingestion. In fact, the CloudTrail, CloudWatch Logs, and VPC Flow Logs cluster modules all rely on Kinesis streams for data delivery.

Each Kinesis stream is a set of shards, which in aggregate determine the total data capacity of the stream. Indeed, this is the primary motivation for StreamAlert’s cluster design - each cluster can have its own data stream whose shard counts can be configured individually.

This module is implemented by terraform/modules/tf_kinesis_streams.

Example
{
  "id": "kinesis-example",
  "classifier_config": {
    "enable_custom_metrics": true,
    "log_level": "info",
    "log_retention_days": 14,
    "memory": 128,
    "timeout": 60
  },
  "data_sources": {
    "kinesis": {
      "abc_prod_streamalert": [
        "cloudwatch"
      ]
    }
  },
  "modules": {
    "kinesis": {
      "streams": {
        "create_user": true,
        "retention": 24,
        "shard_level_metrics": [
          "IncomingBytes",
          "IncomingRecords",
          "IteratorAgeMilliseconds",
          "OutgoingBytes",
          "OutgoingRecords",
          "WriteProvisionedThroughputExceeded"
        ],
        "shards": 1,
        "terraform_outputs": [
          "user_arn",
          "access_key_id",
          "secret_key"
        ]
      }
    },
    "kinesis_events": {
      "batch_size": 100,
      "enabled": true
    }
  }
}

This creates a Kinesis stream and an associated IAM user and hooks up stream events to the StreamAlert classifier function in this cluster. The terraform_outputs section instructs Terraform to print the IAM User’s ARN, along with the access key ID and secret key for the newly created user.

Options

The kinesis module expects a single key (streams) whose value is a dictionary with the following options:

Key

Default

Description

create_user

false

Create an IAM user authorized to PutRecords on the stream

retention

Length of time (hours) data records remain in the stream

shard_level_metrics

[]

Enable these enhanced shard-level metrics

shards

Number of shards (determines stream data capacity)

trusted_accounts

[]

Authorize these account IDs to assume an IAM role which can write to the stream

stream_name

<prefix>_<cluster>_streamalert

[optional] Custom name for the stream that will be created

Kinesis Scaling

If the need arises to scale a Kinesis Stream, the process below is recommended.

First, update the Kinesis Stream shard count with the following command:

aws kinesis update-shard-count \
  --stream-name <prefix>_<cluster>_streamalert_kinesis \
  --target-shard-count <new_shard_count> \
  --scaling-type UNIFORM_SCALING

AWS CLI reference for update-shard-count

Repeat this process for each cluster in your deployment.

Note

It may take several minutes, or longer, for new shards to be created.

Then, update each respective cluster configuration file with the updated shard count.

Finally, apply the Terraform changes to ensure a consistent state.

python manage.py build --target kinesis
Kinesis Events

The Kinesis Events module connects a Kinesis Stream to the classifier Lambda function.

Note

The Kinesis module must also be enabled.

This module is implemented by terraform/modules/tf_kinesis_events.

Options

Key

Default

Description

batch_size

100

Max records the classifier function can receive per invocation

enabled

false

Toggle the kinesis events on and off

VPC Flow Logs

VPC Flow Logs capture information about the IP traffic going to and from an AWS VPC.

When writing rules for this data, use the cloudwatch:flow_logs log source.

Note

The Kinesis module must also be enabled.

This module is implemented by terraform/modules/tf_flow_logs.

Example
{
  "id": "prod",
  "classifier_config": {
    "enable_custom_metrics": true,
    "log_level": "info",
    "log_retention_days": 14,
    "memory": 128,
    "timeout": 60
  },
  "data_sources": {
    "kinesis": {
      "abc_prod_streamalert": [
        "cloudwatch:flow_logs"
      ]
    }
  },
  "modules": {
    "flow_logs": {
      "enis": [],
      "enabled": true,
      "subnets": [
        "subnet-12345678"
      ],
      "vpcs": [
        "vpc-ed123456"
      ]
    },
    "kinesis": {
      "streams": {
        "retention": 24,
        "shards": 10
      }
    },
    "kinesis_events": {
      "batch_size": 2,
      "enabled": true
    }
  }
}

This creates the <prefix>_prod_streamalert_flow_logs CloudWatch Log Group, adds flow logs to the specified subnet, eni, and vpc IDs with the log group as their target, and adds a CloudWatch Logs Subscription Filter to that log group to send to Kinesis for consumption by StreamAlert.

Options

Key

Default

Description

enabled

true

Toggle the flow_logs module

flow_log_filter

[version, account, eni, source, destination, srcport, destport, protocol, packets, bytes, windowstart, windowend, action, flowlogstatus]

Toggle flow log creation

log_retention

7

Day for which logs should be retained in the log group

enis

[]

Add flow logs for these ENIs

subnets

[]

Add flow logs for these VPC subnet IDs

vpcs

[]

Add flow logs for these VPC IDs

Note

One of the following must be set for this module to have any result: enis, subnets, or vpcs

S3 Events

You can enable S3 event notifications on any of your S3 buckets to invoke the StreamAlert classifier function. When the StreamAlert classifier function receives this notification, it downloads the object from S3 and runs each record through the classification logic.

This module is implemented by terraform/modules/tf_s3_events.

Example
{
  "id": "s3-events-example",
  "classifier_config": {
    "enable_custom_metrics": true,
    "log_level": "info",
    "log_retention_days": 14,
    "memory": 128,
    "timeout": 60
  },
  "data_sources": {
    "s3": {
      "bucket_name_01": [
        "cloudtrail"
      ],
      "bucket_name_02": [
        "cloudtrail"
      ]
    }
  },
  "modules": {
    "s3_events": {
      "bucket_name_01": [
        {
          "filter_prefix": "AWSLogs/1234",
          "filter_suffix": ".log"
        },
        {
          "filter_prefix": "AWSLogs/5678"
        }
      ],
      "bucket_name_02": []
    }
  }
}

This configures the two buckets (bucket_name_01 and bucket_name_02) to notify the classifier function in this cluster when new objects arrive in the bucket at the specified (optional) prefix(es), provided the objects have the specified (optional) suffix(es). Additionally, this will authorize the classifier to download objects from each bucket.

Options

The s3_events module expects a dictionary/map of bucket names, where the value for each key (bucket name) is a list of maps. Each map in the list can include optional prefixes (filter_prefix) and suffixes (filter_suffix) to which the notification should be applied. The mere existence of a bucket name in this map within this module implicitly enables event notifications for said bucket. Note that the value specified for the map of prefixes and suffixes can be an empty list ([]). An empty list will enable event notifications for all objects created in the bucket by default.

See the above example for how prefixes/suffixes can be (optionally) specified (as in “bucket_name_01”) and how to use the empty list to enable bucket-wide notifications (as in “bucket_name_02”).

Schemas

Log schemas are required by StreamAlert to detect the correct log type of an incoming record.

Schemas are defined in conf/schemas/<log-type>.json and used by rules to determine which records are analyzed.

They can be defined in one single file, or multiple files, ideally split by each log type, e.g carbonblack.json

They represent the structure of a given log in the form of key/value pairs.

Each key in a schema corresponds to the name of a field referenced by rules and its value represents the data type the field is cast into.

Note

Ordering is strict for the csv parser.

Log Options

Key

Required

Description

parser

Yes

The name of the parser to use for a given log’s data-type. Options include json, csv, kv, or syslog

schema

Yes

A map of key/value pairs of the name of each field with its type

configuration

No

Configuration options specific to this log type (see table below for more information)

Settings in configuration

The below settings may optionally be defined within the configuration block.

Key

Description

delimiter

For use with key/value or csv logs to identify the delimiter character for the log

envelope_keys

Used with nested records to identify keys that are at a higher level than the nested records, but still hold some value and should be stored

json_path

Path to nested records to be ‘extracted’ from within a JSON object

json_regex_key

The key name containing a JSON string to parse. This will become the final record

log_patterns

Various patterns to enforce within a log given provided fields

optional_top_level_keys

Keys that may or may not be present in a log being parsed

optional_envelope_keys

Keys that may or may not be present in the envelope of a log being parsed

priority

Integer value used to set the order that schema get tested against data, with the range 0..N where 0 is the highest priority and N is the lowest

separator

For use with key/value logs to identify the separator character for the log

Writing Schemas

Schema values are strongly typed and enforced.

Normal types:

  • string - 'example'

  • integer - 0

  • float - 0.0

  • boolean - true/false

Special types:

  • {} - zero or more key/value pairs of any type

  • [] - zero or more elements of any type

Basic Schema Definition
Example Schema
{
  "example_log_name": {
    "parser": "json",
    "schema": {
      "field_1": "string",
      "field_2": "integer",
      "field_3": "boolean",
      "field_4": "float",
      "field_5": [],
      "field_6": {}
    }
  }
}
Example rule
@rule(logs=['example_log_name'],              # the log_name as defined above
      outputs=['slack'])
def example_rule(rec):
  """Description of the rule"""
  return (
    rec['field_1'] == 'string-value' and      # fields as defined in the schema above
    rec['field_2'] < 5 and
    'random-key-name' in rec['field_6']
  )
Casting Normal Types
Example Schema
{
  "example_log_name": {
    "parser": "json",
    "schema": {
      "field_1": "string",
      "field_2": "integer",
      "field_3": "boolean"
    }
  }
}
Example Log Before Parse
{
  "field_1": "test-string",
  "field_2": "100",
  "field_3": "true"
}
Example Log After Parsing
{
  'field_1': 'test-string',
  'field_2': 100,
  'field_3': True
}
Example rule

Notice the boolean comparison for the newly-cast types.

@rule(logs=['example_log_name'],
      outputs=['example_output'])
def example_rule(rec):
  return (
    rec['field_2'] == 100 and
    rec['field_3'] is not False
  )
Casting Special Types

Schemas can be as rigid or permissive as you want (see Example: osquery).

Usage of the special types normally indicates a loose schema, in that not every part of the incoming data is described.

Example Schema
{
  "example_log_name": {
    "parser": "json",
    "schema": {
      "field_1": "string",
      "field_2": "integer",
      "field_3": {}
    }
  }
}
Example Log Before Parse
{
  "field_1": "test-string",
  "field_2": "100",
  "field_3": {
    "data": "misc-data",
    "time": "1491584265"
  }
}
Example Log After Parsing
{
  'field_1': 'test-string',
  'field_2': 100,
  'field_3': {
    'data': 'misc-data',
    'time': '1491584265'
  }
}
Example Rule with a Loose Schema
@rule(logs=['example_log_name'],
      outputs=['example_output'],
      req_subkeys={'field_3': ['time']})
def example_rule_2(rec):
  return (
    field_2 == 100 and
    last_hour(int(rec['field_3']['time']))
  )

Also note the usage of req_subkeys above.

This keyword argument ensures that the parsed log contains the required subkeys of rec['field_3']['time'].

Optional Top Level Keys

If incoming logs occasionally include/exclude certain fields, this can be expressed in the configuration settings as optional_top_level_keys.

The value of optional_top_level_keys should be an array, with entries corresponding to the actual key in the schema that is optional. Any keys specified in this array should also be included in the defined schema.

If any of the optional_top_level_keys do not exist in the log being parsed, defaults are appended to the parsed log depending on the declared value.

Example Schema
{
  "test_log_type_json": {
    "parser": "json",
    "schema": {
      "key1": [],
      "key2": "string",
      "key3": "integer",
      "key4": "boolean",
      "key5": "string"
    },
    "configuration": {
      "optional_top_level_keys": [
        "key4",
        "key5"
      ]
    }
  }
}
Example Log Before Parse
{
  "key1": [
    1,
    2,
    3
  ],
  "key2": "test",
  "key3": 100,
  "key4": true
}
Example Log After Parsing
{
  'key1': [3, 4, 5],
  'key2': 'test',
  'key3': 200,
  'key4': True,     # default is overridden by parsed log
  'key5': ''        # default value for string is inserted
}
JSON Parsing
Options
{
  "log_name": {
    "parser": "json",
    "schema": {
      "field": "type",
      "field...": "type..."
    },
    "configuration": {
      "json_path": "jsonpath expression",
      "json_regex_key": "key with nested JSON string to extract",
      "envelope_keys": {
        "field": "type",
        "field...": "type..."
      }
    }
  }
}

Note

Options related to nested JSON are defined within configuration. The json_path key should hold the JSON path to the records, while envelope_keys is utilized to capture keys in the root of our nested structure.

Nested JSON

Normally, a log contains all fields to be parsed at the top level:

{
  "example": 1,
  "host": "myhostname.domain.com",
  "time": "10:00 AM"
}

In some cases, the fields to be parsed and analyzed may be nested several layers into the data:

{
  "logs": {
    "results": [
      {
        "example": 1,
        "host": "jumphost-1.domain.com",
        "time": "11:00 PM"
      },
      {
        "example": 2,
        "host": "jumphost-2.domain.com",
        "time": "12:00 AM"
      }
    ]
  },
  "id": 1431948983198,
  "application": "my-app"
}

To extract these nested records, use the configuration option json_path:

{
  "log_name": {
    "parser": "json",
    "schema": {
      "example": "integer",
      "host": "string",
      "time": "string"
    },
    "configuration": {
      "json_path": "logs.results[*]"
    }
  }
}
Log Patterns

Log patterns provide the ability to differentiate log schemas that are nearly identical.

They can be added by using the configuration option log_patterns.

Log patterns are a collection of key/value pairs where the key is the name of the field, and the value is a list of expressions the log parser will search for in said field of the log.

If any of the log patterns listed exists in a specific field, the parser will consider the data valid.

This feature is helpful to reduce false positives, as it provides to ability to match a schema only if specific values are present in a log.

Wild card log patterns are supported using the * or ? symbols, as shown below.

Example schema:

{
  "log_name": {
    "schema": {
      "computer_name": "string",
      "hostname": "string",
      "instance_id": "string",
      "process_id": "string",
      "message": "string",
      "timestamp": "float",
      "type": "string"
    },
    "parser": "json",
    "configuration": {
      "log_patterns": {
        "type": [
          "*bad.log.type*"
        ]
      }
    }
  }
}

Example logs:

{
  "computer_name": "test-server-name",
  "hostname": "okay_host",
  "instance_id": "95909",
  "process_id": "82571",
  "message": "this is not important info",
  "timestamp": "1427381694.88",
  "type": "good.log.type.value"
}

Note

The above schema will not match the configuration above.

{
  "computer_name": "fake-server-name",
  "hostname": "bad_host",
  "instance_id": "589891",
  "process_id": "72491",
  "message": "this is super important info",
  "timestamp": "1486943917.12",
  "type": "bad.log.type.value"
}

Note

The above schema will match the configuration above.

Envelope Keys

Continuing with the example above, if the id and application keys in the root of the log are needed for analysis, they can be added by using the configuration option envelope_keys:

{
  "log_name": {
    "parser": "json",
    "schema": {
      "example": "integer",
      "host": "string",
      "time": "string"
    },
    "configuration": {
      "json_path": "logs.results[*]",
      "envelope_keys": {
        "id": "integer",
        "application": "string"
      }
    }
  }
}

The resultant parsed records:

[
  {
    "example": 1,
    "host": "jumphost-1.domain.com",
    "time": "11:00 PM",
    "streamalert:envelope_keys": {
      "id": 1431948983198,
      "application": "my-app"
    }
  },
  {
    "example": 2,
    "host": "jumphost-2.domain.com",
    "time": "12:00 AM",
    "streamalert:envelope_keys": {
      "id": 1431948983198,
      "application": "my-app"
    }
  }
]
Nested JSON Regex Parsing

When using forwarders such as fluentd, logstash, or rsyslog, log data may be wrapped with additional context keys:

{
  "collector": "my-app-1",
  "date-collected": "Oct 12, 2017",
  "@timestamp": "1507845487",
  "data": "<0> program[pid]: {'actual': 'data is here'}"
}

To parse the nested JSON string as the record, use the following schema options:

{
  "json:regex_key_with_envelope": {
    "schema": {
      "actual": "string"
    },
    "parser": "json",
    "configuration": {
      "envelope_keys": {
        "collector": "string",
        "date-collected": "string",
        "@timestamp": "string"
      },
      "json_regex_key": "data"
    }
  }
}

Optionally, you can omit envelope keys if they provide no value in rules.

CSV Parsing
Options
{
  "csv_log_name": {
    "parser": "csv",
    "schema": {
      "field": "type",
      "field...": "type..."
    },
    "configuration": {
      "delimiter": ","
    }
  }
}

Note

A custom delimiter is specified within configuration above.

By default, the csv parser will use , as the delimiter.

The configuration setting is optional.

Ordering of the fields within schema is strict.

Nested CSV

Some CSV logs have nested fields.

Example logs:

"1485729127","john_adams","memcache,us-east1"
"1485729127","john_adams","mysqldb,us-west1"

You can support this with a schema similar to the following:

{
  "example_csv_with_nesting": {
    "parser": "csv",
    "schema": {
      "time": "integer",
      "user": "string",
      "message": {
        "role": "string",
        "region": "string"
      }
    }
  }
}
KV Parsing
Options
{
  "kv_log_name": {
    "parser": "kv",
    "schema": {
      "field": "type",
      "field...": "type..."
    },
    "configuration": {
      "delimiter": " ",
      "separator": "="
    }
  }
}

Note

The delimiter and separator keys within configuration indicate the values to use for delimiter and field separator, respectively.

By default, the kv parser will use a single space as the delimiter and = as the field separator.

The configuration setting is optional.

Example schema:

{
  "example_kv_log_type": {
    "parser": "kv",
    "schema": {
      "time": "integer",
      "user": "string",
      "result": "string"
    }
  }
}

Example log:

"time=1039395819 user=bob result=pass"
Syslog Parsing
Options
{
  "syslog_log_name": {
    "parser": "syslog",
    "schema": {
      "timestamp": "string",
      "host": "string",
      "application": "string",
      "message": "string"
    }
  }
}

The syslog parser has no configuration options.

The schema is also static for this parser because of the regex used to parse records.

Log Format

The syslog parser matches events with the following format:

timestamp(Month DD HH:MM:SS) host application: message

Example logs:

Jan 10 19:35:33 vagrant-ubuntu-trusty-64 sudo: session opened for root
Jan 10 19:35:13 vagrant-ubuntu-precise-32 ssh[13941]: login for user
More Examples

For a list of schema examples, see Example Schemas

Deployment

Make sure you’ve completed the Getting Started instructions prior to continuing.

Initial Build

To initialize StreamAlert:

python manage.py init

This will perform the following:

  • Create S3 buckets and encryption keys.

  • Create AWS Lambda functions.

  • Build declared infrastructure in the Terraform files.

  • Deploy initial production AWS Lambda versions.

Type yes at each prompt.

Continuous Deployment

As new rules, sources, or outputs are added to StreamAlert, new versions of the AWS Lambda functions must be deployed for changes to become effective.

To accomplish this, manage.py contains a deploy command.

To deploy new changes for all AWS Lambda functions:

python manage.py deploy

Optionally, to deploy changes for only a specific AWS Lambda function:

python manage.py deploy --functions alert
python manage.py deploy --functions alert_merger
python manage.py deploy --functions apps
python manage.py deploy --functions athena
python manage.py deploy --functions classifier
python manage.py deploy --functions rule
python manage.py deploy --functions rule_promo
python manage.py deploy --functions threat_intel_downloader

To apply infrastructure level changes (additional Kinesis Shards, new CloudTrails, etc), run:

python manage.py build

To apply specific changes to speed up terraform run, use the list-targets command and the build command with the --target option:

python manage.py list-targets

  Target                                                                                Type
  ----------------------------------------------------------------------------------------------
  ...
  classifier_prod_iam                                                                   module
  classifier_prod_lambda                                                                module
  cloudwatch_monitoring_prod                                                            module
  kinesis_events_prod                                                                   module
  kinesis_prod                                                                          module
  metric_filters_Classifier_FailedParses_PROD                                           module
  metric_filters_Classifier_FirehoseFailedRecords_PROD                                  module
  metric_filters_Classifier_FirehoseRecordsSent_PROD                                    module
  ...

python manage.py build --target cloudwatch_monitoring_prod        # apply to single module
python manage.py build --target kinesis_prod classifier_prod_iam  # apply to two modules
python manage.py build --target metric_filters_Classifier_*_PROD  # apply to three modules
Monitoring Functions

StreamAlert clusters contain a module to create CloudWatch Alarms for monitoring AWS Lambda invocation errors.

These ensure that the currently running code is reliable. To access these monitors, login to AWS Console and go to CloudWatch, and then click Alarms.

Rollback

StreamAlert Lambda functions are invoked via a production alias that can be easily rolled back to point to the previous version:

python manage.py rollback --functions rule
python manage.py rollback --functions alert
python manage.py rollback

This is helpful to quickly revert changes to Lambda functions, e.g. if a bad rule was deployed.

Rules

  • Rules contain data analysis and alerting logic

  • Rules are written in native Python, not a proprietary language

  • A Rule can utilize any Python function or library

  • A Rule can be run against multiple log sources if desired

  • Rules can be isolated into defined environments/clusters

  • Rule alerts can be sent to one or more outputs, like S3, PagerDuty or Slack

  • Rules can be unit and integration tested

Getting Started

All rules are located in the rules/ directory.

Within this directory are two folders: community/ and default/.

community/ rules are publicly shared from StreamAlert contributors.

default/ can be used as a generic container for rules files.

You may create any folder structure desired, as all rules folders are imported recursively. Here are some examples:

  • rules/intrusion-detection/malware.py

  • rules/compliance/pci.py

  • rules/default/infrastructure.py

Note

If you create additional folders within the rules directory, be sure to include a blank __init__.py file.

Overview

A StreamAlert rule is a Python method that takes a parsed log record (dictionary) and returns True or False. A return value of True means an alert will be generated.

Example: The Basics

The simplest possible rule looks like this:

from streamalert.shared.rule import rule

@rule(logs=['cloudwatch:events'])
def all_cloudwatch_events(record):
    """Minimal StreamAlert rule: alert on all CloudWatch events"""
    return True

This rule will be evaluated against all inbound logs that match the cloudwatch:events schema defined in a schema file in the conf/schemas directory, i.e conf/schemas/cloudwatch.json. In this case, all CloudWatch events will generate an alert, which will be sent to the alerts Athena table.

Example: Logic & Outputs

Let’s modify the rule to page the security team if anyone ever uses AWS root credentials:

from streamalert.shared.rule import rule

@rule(logs=['cloudwatch:events'], outputs=['pagerduty:csirt', 'slack:security'])
def cloudtrail_root_account_usage(record):
    """Page security team for any usage of AWS root account"""
      return (record['detail']['userIdentity']['type'] == 'Root'
              and record['detail']['userIdentity'].get('invokedBy') is None
              and record['detail']['eventType'] != 'AwsServiceEvent')

Now, any AWS root account usage is reported to PagerDuty, Slack, and the aforementioned Athena table. In order for this to work, your datasources and outputs must be configured so that:

  • CloudTrail logs are being sent to StreamAlert via CloudWatch events

  • The pagerduty:csirt and slack:security outputs have the proper credentials

Example: More Rule Options

The previous example suffers from the following problems:

  • It only works for CloudTrail data sent via CloudWatch

  • A single legitimate root login may generate hundreds of alerts in the span of a few minutes

  • There is no distinction between different AWS account IDs

We can generalize the rule to alleviate these issues:

from rules.helpers.base import get_first_key  # Find first key recursively in record
from streamalert.shared.rule import rule

# This could alternatively be defined in matchers/matchers.py to be shareable
_PROD_ACCOUNTS = {'111111111111', '222222222222'}

def prod_account(record):
    """Match logs for one of the production AWS accounts"""
    return (
        record.get('account') in _PROD_ACCOUNTS or
        get_first_key(record, 'userIdentity', {}).get('accountId') in _PROD_ACCOUNTS
    )

@rule(
    logs=['cloudtrail:events', 'cloudwatch:events'],  # Rule applies to these 2 schemas
    matchers=[prod_account],  # Must be satisfied before rule is evaluated
    merge_by_keys=['useragent'],  # Merge alerts with the same 'useragent' key-value pair
    merge_window_mins=5,  # Merge alerts every 5 minutes
    outputs=['pagerduty:csirt', 'slack:security']  # Send alerts to these 2 outputs
)
def cloudtrail_root_account_usage(record):
    """Page security team for any usage of AWS root account"""
    return (
        get_first_key(record, 'userIdentity', {}).get('type') == 'Root' and
        not get_first_key(record, 'invokedBy') and
        get_first_key(record, 'eventType') != 'AwsServiceEvent'
    )

To simplify rule logic, you can extract common routines into custom helper methods. These helpers are defined in rules/helpers/base.py and can be called from within a matcher or rule (as shown here).

Since rules are written in Python, you can make them as sophisticated as you want!

Rule Options

The following table provides an overview of each rule option, with more details below:

@rule kwarg

Type

Description

context

Dict[str, Any]

Dynamically configurable context passed to the alert processor

datatypes

List[str]

List of normalized type names the rule applies to

logs

List[str]

List of log schemas the rule applies to

matchers

List[str]

Matcher pre-conditions which must be met before rule logic runs

merge_by_keys

List[str]

List of key names that must match in value before merging alerts

merge_window_mins

int

Merge related alerts at this interval rather than sending immediately

outputs

List[str]

List of alert outputs

dynamic_outputs

List[function]

List of functions which return valid outputs

req_subkeys

Dict[str, List[str]]

Subkeys which must be present in the record

context

context can pass extra instructions to the alert processor for more precise routing:

# Context provided to the pagerduty-incident output with
# instructions to assign the incident to a user.

@rule(logs=['osquery:differential'],
      outputs=['pagerduty:csirt'],
      context={'pagerduty-incident': {'assigned_user': 'valid_user'}})
def my_rule(record, context):
    context['pagerduty-incident']['assigned_user'] = record['username']
    return True
datatypes

conf/normalized_types.json defines data normalization, whereby you can write rules against a common type instead of a specific field or schema:

"""These rules apply to several different log types, defined in conf/normalized_types.json"""
from streamalert.shared.rule import rule
from streamalert.shared.normalize import Normalizer

@rule(datatypes=['sourceAddress'], outputs=['aws-sns:my-topic'])
def ip_watchlist_hit(record):
    """Source IP address matches watchlist."""
    return '127.0.0.1' in Normalizer.get_values_for_normalized_type(record, 'sourceAddress')

@rule(datatypes=['command'], outputs=['aws-sns:my-topic'])
def command_etc_shadow(record):
    """Command line arguments include /etc/shadow"""
    return any(
        '/etc/shadow' in cmd.lower()
        for cmd in Normalizer.get_values_for_normalized_type(record, 'command')
    )
logs

logs define the log schema(s) supported by the rule.

Log datasources are defined within the data_sources field of a cluster such as conf/clusters/<cluster>.json and their schemas are defined in one or more files in the conf/schemas directory.

Note

Either logs or datatypes must be specified for each rule

matchers

matchers define conditions that must be satisfied in order for the rule to be evaluated. Default matchers are defined in matchers/matchers.py but can also be defined in the rules file (see example above).

A matcher function should accept a single argument, just like rules. That argument will be the record that is being evaluated by the rule.

Rules can utilize matchers to reduce redundancy of code, allowing you to define the logic once and easily use it across multiple rules.

merge_by_keys / merge_window_mins

Note

Specify neither or both of these fields, not one of them in isolation

For a better alert triage experience, you can merge alerts whose records share one or more fields in common:

@rule(logs=['your-schema'],
      merge_by_keys=['alpha', 'beta', 'gamma'],
      merge_window_mins=5):
def merged_rule(record):
    return True

The alert merger Lambda function will buffer all of these alerts until 5 minutes have elapsed, at which point

{
  "alpha": "A",
  "nested": {
    "beta": "B"
  },
  "gamma": [1, 2, 3],
  "timestamp": 123
}

would be automatically merged with

{
  "alpha": "A",
  "nested": {
    "beta": "B",
    "extra": "field"
  },
  "gamma": [1, 2, 3],
  "timestamp": 456
}

A single consolidated alert will be sent showing the common keys and the record differences. All of the specified merge keys must have the same value in order for two records to be merged, but those keys can be nested anywhere in the record structure.

Note

The original (unmerged) alert will always be sent to Athena.

dynamic_outputs

The dynamic_outputs keyword argument defines additional outputs to an Alert which are dynamically generated. See dynamic_outputs for more info

outputs

The outputs keyword argument defines the alert destination if the return value of a rule is True. Alerts are always sent to an Athena alerts table which is easy to query. Any number of additional outputs can be specified.

req_subkeys

req_subkeys defines sub-keys that must exist in the incoming record (with a non-zero value) in order for it to be evaluated.

This feature should be used if you have logs with a loose schema defined in order to avoid raising a KeyError in rules.

# The 'columns' key must contain sub-keys of 'address' and 'hostnames'

@rule(logs=['osquery:differential'],
      outputs=['aws-lambda:my-function'],
      req_subkeys={'columns':['address', 'hostnames']})
def osquery_host_check(rec):
    # If all logs did not have the 'address' sub-key, this rule would
    # throw a KeyError.  Using req_subkeys avoids this.
    return rec['columns']['address'] == '127.0.0.1'

Disabling Rules

In the event that a rule must be temporarily disabled, the @disable decorator can be used. This allows you to keep the rule definition and tests in place instead of having to remove them entirely:

from streamalert.shared.rule import disable, rule

@disable  # TODO: this rule is too noisy!
@rule(logs=['example'], outputs=['slack'])
def example_rule(record):
    return True

Testing

For instructions on how to create and run tests to validate rules, see Testing.

Testing

To test the accuracy of new rules, local tests can be written to verify that alerts trigger against valid input.

The manage.py CLI tool comes built-in with a test command which does exactly this.

Configuration

To test a new rule, first create a new JSON file next to your rule file. The suggested convention is to use the same name as the rule you are testing, but you can choose any name you would like. This will help with organization, but you may also create test events to test your rules anywhere within the same top-level directory where your rules are stored.

Basic Configuration

Each test event file should contain the following structure:

[
  {
    "data": {
      "key_01": "value_01",
      "key_02": "value_02"
    },
    "description": "This test should trigger or not trigger an alert",
    "log": "The log name declared in a json file under the conf/schemas directory",
    "service": "The service sending the log - kinesis, s3, sns, or streamalert_app",
    "source": "The exact resource which sent the log - kinesis stream name, s3 bucket ID, SNS topic name, or streamalert_app function name",
    "trigger_rules": [
      "rule_name_that_should_trigger_for_this_event",
      "another_rule_name_that_should_trigger_for_this_event"
    ]
  }
]

Note

Multiple tests can be included in one file by adding them to the array above.

Specifying Test Data

When specifying the test data, it can be either of two fields:

  1. "data": An entire example record, with all necessary fields to properly classify

  2. "override_record": A subset of the example record, where only relevant fields are populated

The advantage of option #2 is that the overall test event is much smaller.

The testing framework will auto-populate the records behind the scenes with the remaining fields for that given log type.

For example:

[
  {
    "data": {
      "account": "123456789102",
      "detail": {
        "request": {
          "eventName": "putObject",
          "bucketName": "testBucket"
        }
      },
      "detail-type": "API Call",
      "id": "123456",
      "region": "us-west-2",
      "resources": [
        "testBucket"
      ],
      "source": "aws.s3",
      "time": "Jan 01 2018 12:00",
      "version": "1.05"
    },
    "description": "An example test with a full cloudwatch event",
    "log": "cloudwatch:events",
    "service": "s3",
    "source": "test-s3-bucket-name",
    "trigger_rules": [
      "my_fake_rule"
    ]
  }
]

Let’s say a rule is only checking the value of source in the test event. In that case, there’s no added benefit to fill in all the other data. Here is what the event would look like with override_record:

[
  {
    "override_record": {
      "source": "aws.s3"
    },
    "description": "An example test with a partial cloudwatch event",
    "log": "cloudwatch:events",
    "service": "s3",
    "source": "test-s3-bucket-name",
    "trigger_rules": [
      "my_fake_rule"
    ]
  }
]

Both test events would have the same result, but with much less effort.

Note

Either override_record or data is required in the test event

Testing Classification

Classification tests are always run on each test. Consider these two fields in the test configuration:

[
  {
    "log": "cloudwatch:events",
    "classify_only": true
  }
]

The log field in each test specifies the expected classified type of the test record. The test will fail if the classified log type differs.

By default, the test runner will continue on to test rules. If you only wish to test classification, specify classify_only as true.

Testing Rules

Assuming a test is not classify_only, rules are run after classification. Consider this field in the test file:

[
  {
    "trigger_rules": [
      "my_first_fake_rule",
      "my_second_fake_rule"
    ]
  }
]

All rules are run on each set of test data. The trigger_rules field specifies an array of rule names that should be triggered as a result. An empty array implies that the test data should not trigger any rules.

Publisher Tests

Consider the following rule:

@rule(
  logs=['cloudwatch:events'],
  outputs=['slack:sample-channel'],
  publishers={'slack': my_publisher}
)
def my_rule(record):
  # .. something logic
  return True

To test the output of the Alert Publisher framework, you can specify publisher_tests. Consider this field:

[
  {
    "trigger_rules": ["my_rule"],
    "publisher_tests": {
      "slack:sample-channel": [
        {
          "jmespath_expression": "path.to.record",
          "condition": "is",
          "value": 4
        },
        [ "path.to.other.record", "is", 5 ]
      ]
    }
  }
]

This field is a dictionary, where keys specify outputs to test. Each key’s value is an array of publisher tests. These tests compare the Alert Publisher’s output to a configured expectation.

Each publisher test can be a dict with 3 keys:

  • jmespath_expression: A jmespath search expression. This is run on the Alert Publisher output for the given OutputDispatcher.

  • condition: Either “is” or “in”, for equality or substring/subset matching, respectively.

  • value: The expected value of the field.

The field that is extract via the jmespath_expression is tested against the expected value, using the conditional.

Note

An alternate shorthand syntax to the above is to specify a triple of strings:

["path.to.field", "is", "value"]
Rule Test Reference

Key

Type

Required

Description

compress

boolean

No

Whether or not to compress records with gzip prior to testing. This is useful to simulate services that send gzipped data.

data

map or string

Yes*

The record to test against your rules. All json log types should be in JSON object/dict format while others (csv, kv, or syslog) should be string. *This is not required if the override_record option is used.

override_record

map

Yes*

A partial record to use in test events, more information below *This is not required if the data option is used.

description

string

Yes

A short sentence describing the intent of the test

log

string

Yes

The log type this test record should parse as. The value of this should be taken from the defined logs in one or more files in the conf/schemas directory

service

string

Yes

The name of the service which sent the log. This should be one of: kinesis, s3, sns, or streamalert_app.

source

string

Yes

The name of the Kinesis Stream or S3 bucket, SNS topic or StreamAlert App function where the data originated from. This value should match a source provided in the data_sources field defined within a cluster in conf/clusters/<cluster>.json

trigger_rules

list

No

A list of zero or more rule names that this test record should trigger. An empty list implies this record should not trigger any alerts

classify_only

boolean

No

Whether or not the test record should go through the rule processing engine. If set to true, this record will only be tested for valid classification

publisher_tests

dict

No

This is a dict of tests to run against the Alert’s published representation. The keys of the dict are output descriptors. The values of the dict should be arrays of individual tests. Publisher tests use jmespath to extract values from the final publication dictionary for testing. At least one rule should be triggered, or publisher tests will do nothing.

test_fixtures

dict

No

Values to be mocked out for use within rules for the threat_intel and lookup_tables features. See below for examples of this.

Test Fixtures Configuration

Fixtures for tests events should be configured as part of the event itself. These should be added within the threat_intel or lookup_tables keys under a test_fixtures section of the test event. Usage of these two sections is outlined below.

Threat Intel Fixtures

The below format should be used to “mock” out threat intel data to test rules that leverage this feature.

[
  {
    "test_fixtures": {
      "threat_intel": [
        {
          "ioc_value": "1.2.3.4",
          "ioc_type": "ip",
          "sub_type": "mal_ip"
        },
        {
          "ioc_value": "0123456789abcdef0123456789abcdef",
          "ioc_type": "md5",
          "sub_type": "mal_md5"
        }
      ]
    }
  }
]
Lookup Tables Fixtures

The below format should be used to “mock” out lookup table data to test rules that leverage this feature.

[
  {
    "test_fixtures": {
      "lookup_tables": {
        "dynamodb-table-name": {
          "lookup_key": [
            "value_for_rule"
          ]
        }
      }
    }
  }
]

For more examples of how to configure tests for rules, see the provided default rules and tests in the rules/ directory

Running Tests

Tests are run via the manage.py script. These tests include the ability to validate defined log schemas for accuracy, as well as rules efficacy. Additionally, alerts can be sent from the local system to a real, live alerting output (if configured).

The below options are available for running tests. Please note that each subsequent test command here includes all of the prior tests. For instance, the rules command will also test everything that the classifier command tests. See the Test Options section for available options for all of these commands.

Classifier Tests

Running tests to ensure test events classify properly:

python manage.py test classifier

Note

The classifier test command does not test the efficacy of rules, and simply ensures defined test events classify as their expected schema type.

Rule Tests

Running tests to ensure test events classify properly and trigger the designated rules:

python manage.py test rules
Live Tests

Running tests to actually send alerts to a rule’s configured outputs:

python manage.py test live

Note

The live test command does not invoke any deployed Lambda functions, and only uses the local code, test events, and rules. However, authentication secrets needed to send alerts are in fact read from S3 during this process, so AWS credentials must still be set up properly.

Test Options

Any of the test commands above can be restricted to specific files to reduce time and output:

python manage.py test classifier --test-files <test_file_01.json> <test_file_02>

Note

Only the name of the file is required, with or without the file extension, not the entire path.

Tests can also be restricted to specific rules:

python manage.py test rules --test-rules <rule_01> <rule_02>

Note

Note that this is the name of the rule(s) themselves, not the name of the Python file containing the rule(s).

Tests can be directed to run against an alternative directory of test event files:

python manage.py test rules --files-dir /path/to/alternate/test/files/directory

Note

Note that this is the name of the rule(s) themselves, not the name of the Python file containing the rule(s).

Test Examples

Here is a sample command showing how to run tests against two test event files included in the default StreamAlert configuration:

python manage.py test rules --test-files rules/community/cloudwatch_events/cloudtrail_put_bucket_acl.json rules/community/cloudwatch_events/cloudtrail_root_account_usage.json

This will produce output similar to the following:

Running tests for files found in: rules

File: rules/community/cloudwatch_events/cloudtrail_put_bucket_acl.json

Test #01: Pass
Test #02: Pass

File: rules/community/cloudwatch_events/cloudtrail_root_account_usage.json

Test #01: Pass
Test #02: Pass

Summary:

Total Tests: 4
Pass: 4
Fail: 0

To see more verbose output for any of the test commands, add the --verbose flag. The previous command, with the addition of the --verbose flag, produces the following output:

Running tests for files found in: rules

File: rules/community/cloudwatch_events/cloudtrail_put_bucket_acl.json

Test #01:

    Description: Modifying an S3 bucket to have a bucket ACL of AllUsers or AuthenticatedUsers should create an alert.

    Classification: Pass
        Classified Type: cloudwatch:events
        Expected Type: cloudwatch:events

    Rules: Pass
        Triggered Rules: cloudtrail_put_bucket_acl
        Expected Rules: cloudtrail_put_bucket_acl

Test #02:

    Description: Modifying an S3 bucket ACL without use of AllUsers or AuthenticatedUsers should not create an alert.

    Classification: Pass
        Classified Type: cloudwatch:events
        Expected Type: cloudwatch:events

    Rules: Pass
        Triggered Rules: <None>
        Expected Rules: <None>


File: rules/community/cloudwatch_events/cloudtrail_root_account_usage.json

Test #01:

    Description: Use of the AWS 'Root' account will create an alert.

    Classification: Pass
        Classified Type: cloudwatch:events
        Expected Type: cloudwatch:events

    Rules: Pass
        Triggered Rules: cloudtrail_root_account_usage
        Expected Rules: cloudtrail_root_account_usage

Test #02:

    Description: AWS 'Root' account activity initiated automatically by an AWS service on your behalf will not create an alert.

    Classification: Pass
        Classified Type: cloudwatch:events
        Expected Type: cloudwatch:events

    Rules: Pass
        Triggered Rules: <None>
        Expected Rules: <None>


Summary:

Total Tests: 4
Pass: 4
Fail: 0

Additionally, any given test that results in a status of Fail will, by default, print verbosely. In the below example, the cloudtrail_put_bucket_acl.json file has been altered to include a triggering rule that does not actually exist.

python manage.py test rules --test-files rules/community/cloudwatch_events/cloudtrail_put_bucket_acl.json rules/community/cloudwatch_events/cloudtrail_root_account_usage.json
Running tests for files found in: rules

File: rules/community/cloudwatch_events/cloudtrail_put_bucket_acl.json

Test #01:

    Description: Modifying an S3 bucket to have a bucket ACL of AllUsers or AuthenticatedUsers should create an alert.

    Classification: Pass
        Classified Type: cloudwatch:events
        Expected Type: cloudwatch:events

    Rules: Fail
        Triggered Rules: cloudtrail_put_bucket_acl
        Expected Rules: cloudtrail_put_bucket_acl, nonexistent_rule (does not exist)

Test #02: Pass

File: rules/community/cloudwatch_events/cloudtrail_root_account_usage.json

Test #01: Pass
Test #02: Pass

Summary:

Total Tests: 4
Pass: 3
Fail: 1

Helpers

It may occasionally be necessary to dynamically fill in values in the test event data. For instance, if a rule relies on the time of an event, the last_hour helper can be embedded in a test event as a key’s value. The embedded helper string will be replaced with the value returned by the helper function.

Available Helpers

last_hour: Generates a unix epoch time within the last hour (ex: 1489105783).

Usage

To use these helpers in rule testing, replace a specific log field value with the following:

"<helper:helper_name_goes_here>"

For example, to replace a time field with a value in the last hour, use last_hour:

{
  "records": [
    {
      "data": {
        "host": "app01.prod.mydomain.net",
        "time": "<helper:last_hour>"
      },
      "description": "example usage of helpers",
      "log": "host_time_log",
      "service": "kinesis",
      "source": "my_demo_kinesis_stream",
      "trigger_rules": [
        "last_hour_rule_name"
      ]
    }
  ]
}

Outputs

StreamAlert comes with a flexible alerting framework that can integrate with new or existing case/incident management tools. Rules can send alerts to one or many outputs.

Out of the box, StreamAlert supports:

  • Amazon CloudWatch Logs

  • Amazon Kinesis Firehose

  • AWS Lambda

  • AWS S3

  • AWS SES

  • AWS SNS

  • AWS SQS

  • Carbon Black

  • Demisto

  • GitHub

  • Jira

  • Komand

  • PagerDuty

  • Phantom

  • Slack

  • Microsoft Teams

StreamAlert can be extended to support any API. Creating a new output to send alerts to is easily accomplished through inheritance from the StreamOutputBase class. More on that in the Adding Support for New Services section below.

With the addition of an output configuration, multiple outputs per service are now possible. As an example, it is now possible for rules to dispatch alerts to multiple people or channels in Slack.

Adhering to the secure by default principle, all API credentials are encrypted and decrypted using AWS Key Management Service (KMS). Credentials are stored on Amazon S3 and are not packaged with the StreamAlert code. They are downloaded and decrypted on an as-needed basis. Credentials are never cached on disk in a decrypted state.

Configuration

Adding a new configuration for a currently supported service is handled using manage.py:

python manage.py output <SERVICE_NAME>

Note

<SERVICE_NAME> above should be one of the following supported service identifiers. aws-cloudwatch-log, aws-firehose, aws-lambda, aws-lambda-v2, aws-s3, aws-sns, aws-sqs, carbonblack, github, jira, jira-v2, komand, pagerduty, pagerduty-incident, pagerduty-v2, phantom, slack

For example:

python manage.py output slack

The above command will then prompt the user for a descriptor to use for this configuration:

Please supply a short and unique descriptor for this Slack integration (eg: channel, group, etc):

After a descriptor is provided, the user is then prompted for the Slack webhook URL:

Please supply the full Slack webhook url, including the secret:

Note

The user input for the Slack webhook URL will be masked. This ‘masking’ approach currently applies to any potentially sensitive information the user may have to enter on the cli and can be enforced through any new services that are implemented.

Adding Support for New Services

Adding support for a new service involves five steps:

  1. Create a subclass of OutputDispatcher

  • For reference, OutputDispatcher is declared in streamalert/alert_processor/outputs/output_base.py

  1. Implement the following methods, at a minimum:

from streamalert.alert_processor.helpers import compose_alert


def get_user_defined_properties(self):
  """Returns any properties for this output that must be provided by the user
  At a minimum, this method should prompt the user for a 'descriptor' value to
  use for configuring any outputs added for this service.

  Returns:
      [OrderedDict] Contains various OutputProperty items
  """
  return OrderedDict([
      ('descriptor',
       OutputProperty(description='a short and unique descriptor for this service configuration '
                                  '(ie: name of integration/channel/service/etc)'))
  ])

def _dispatch(self, alert, descriptor):
  """Handles the actual sending of alerts to the configured service.
  Any external API calls for this service should be added here.
  This method should return a boolean where True means the alert was successfully sent.

  In general, use the compose_alert() method defined in streamalert.alert_processor.helpers
  when presenting the alert in a generic polymorphic format to be rendered on the chosen output
  integration. This is so specialized Publishers can modify how the alert is represented on the
  output.

  In addition, adding output-specific fields can be useful to offer more fine-grained control
  of the look and feel of an alert.

  For example, an optional field that directly controls a PagerDuty incident's title:
  - '@pagerduty.incident_title'


  When referencing an alert's attributes, reference the alert's field directly (e.g.
  alert.alert_id). Do not rely on the published alert.
  """

  publication = compose_alert(alert, self, descriptor)
  # ...
  return True

See the below for more information on the OutputProperty object.

  1. Implement the private __service__ property within the new subclass.

    • This should be a string value that corresponds to an identifier that best represents this service. (eg: __service__ = 'aws-s3')

  2. Add the @StreamAlertOutput class decorator to the new subclass so it registered when the outputs module is loaded.

  3. Extend the AlertProcessorTester.setup_outputs method in streamalert_cli/test.py module to provide mock credentials for your new output.

The OutputProperty Object

The OutputProperty object used in get_user_defined_properties is a namedtuple consisting of a few properties:

description

A description that is used when prompting the user for input. This is to help describe what is expected from the user for this property. At a bare minimum, this property should be set for all instances of OutputProperty. Default is: '' (empty string)

value

The actual value that the user enters for this property. This is replaced using namedtuple._replace during user input. Default is: '' (empty string)

input_restrictions

A set of character values that should be restricted from user input for this property. Default is: {' ', ':'}

mask_input

A boolean that indicates whether the user’s input should be masked using getpass during entry. This should be set for any input that is potentially sensitive. Default is: False

cred_requirement

A boolean that indicates whether this value is required for API access with this service. Ultimately, setting this value to True indicates that the value should be encrypted and stored in Amazon Systems Manager. Default is: False

Strategy

A common strategy that has been found to be effective:

  • Write your rule and only designate a notification-style service, such as Slack, as an output

  • Identify false positives, refine the rule over a period of time

  • “Promote” the rule to production by removing Slack and adding PagerDuty and S3 as outputs

Why:

  • Slack alerts are ephemeral, great for new/beta rules

  • PagerDuty supports webhooks and can still ping Slack

  • S3 will act as a persistent store for production alerts (audit trail, historical context)

Dynamic Outputs

Prerequisites

  • Any output assigned must be added with python manage.py output

  • functions must return None, str or List[str] which maps to an output configured with the above.

  • Only pass context if the rule sets context.

Overview

Adds the ability to have custom logic run to define an output or outputs based on information within the record. For information on supported outputs and how to add support for additional outputs, see outputs

As can be seen by the examples below, they are easy to configure, but add a very useful feature to StreamAlert.

  • StreamAlert sends to all outputs defined within a rules outputs=[] and dynamic_outputs=[] when sending Alerts.

  • It is also possible to pass context to the dynamic_function if the rule sets it.

Note

Any output passed must be configured with ./manage.py output -h

Example: Simple

The below code block is considered a simple dynamic_output function, because the outputs are dynamically configured, but the information used still lives within the code. It also:

  • allows you to maintain a static list of information inside your code

  • will return the outputs relevant to the team who “own” the account

  • Alerts are sent to the aws-sns:security output aswell as those returned by the function

from streamalert.shared.rule import rule

def map_account_to_team(record):
    teams = {
      "team_a": {"accounts": ["123", "456", ...], "outputs": ["aws-sns:team_a"]},
      "team_b": {"accounts": ["789", ...], "outputs": ["aws-sns:team_b", "slack:team_b"]},
    }

    account_id = record.get('recipientaccountid')

    for team in teams:
        if account_id in team["accounts"]:
            return team["outputs"]
    # None is guarded against by StreamAlert

@rule(
  logs=['cloudwatch:events'],
  req_subkeys={
    'detail': ['userIdentity', 'eventType']
  },
  outputs=["aws-sns:security"],
  dynamic_outputs=[map_account_to_team]
)
def cloudtrail_root_account_usage(rec):
  # Rule logic
Example: With LookupTables

With the simple addition of a lookup-table you can take a rule like cloudtrail_root_account_usage and configure it as such:

from streamalert.shared.rule import rule
from streamalert.shared.lookup_tables.core import LookupTables

def dynamic_output_with_context(record, context): # pass context only if the rule added context
    account_id = context["account_id"]

    return LookupTables.get(
      'my_lookup_table',
      'aws-account-owner:{}'.format(account_id),
      None
    ) # potentially returns [aws-sns:team_a]

@rule(
  logs=['cloudwatch:events'],
  outputs=["aws-sns:security],
  dynamic_outputs=[dynamic_output_with_context],
  context={"account_id": "valid_account_id"},
)
def cloudtrail_root_account_usage(rec):
    context["account_id"] = record.get('recipientaccountid')
    # Rule logic

The above has the benefit of using information that lives outside of StreamAlert, which means teams can acquire new accounts and get Alerts without having to alter StreamAlert code.

Example: With Other Data Source
from streamalert.shared.rule import rule
import requests

def dynamic_output(record):
    account_id = record.get('recipientaccountid')

    # invoke an external API to get data back
    response = requests.get("API/team_map")

    for team in response.json():
        if account_id in team["accounts"]:
            return team["outputs"] # potentially "aws-lambda:team_a"

@rule(
  logs=['cloudwatch:events'],
  outputs=["aws-sns:security],
  dynamic_outputs=[dynamic_output],
)
def cloudtrail_root_account_usage(rec):
    # Rule logic

The above example uses an external API to get the output map, which is to be queried with the account_id on the record. This is just an example, but hopefully highlights many ways in which dynamic_outputs can be used.

Warning

The above example could result in many queries to the API in use and could potentially slow down StreamAlert Lambdas when processing Alerts.

Publishers

Publishers are a framework for transforming alerts prior to dispatching to outputs, on a per-rule basis. This allows users to customize the look and feel of alerts.

How do Publishers work?

Publishers are blocks of code that are run during alert processing, immediately prior to dispatching an alert to an output.

Implementing new Publishers

All publishers must be added to the publishers directory. Publishers have two valid syntaxes:

Function

Implement a top-level function with that accepts two arguments: An Alert and a dict. Decorate this function with the @Register decorator.

from streamalert.shared.publisher import Register

@Register
def my_publisher(alert: Alert, publication: dict) -> dict:
  # ...
  return {}
Class

Implement a class that inherits from the AlertPublisher and fill in the implementations for publish(). Decorate the class with the @Register decorator.

from streamalert.shared.publisher import AlertPublisher, Register

@Register
class MyPublisherClass(AlertPublisher):

  def publish(alert: Alert, publication: dict) -> dict:
    # ...
    return {}

Preparing Outputs

In order to take advantage of Publishers, all outputs must be implemented with the following guidelines:

Use compose_alert()

When presenting unstructured or miscellaneous data to an output (e.g. an email body, incident details), outputs should be implemented to use the compose_alert(alert: Alert, output: OutputDispatcher, descriptor: str) -> dict method.

compose_alert() loads all publishers relevant to the given Alert and executes these publishers in series, returning the result of the final publisher.

All data returned by compose_alert() should be assumed as optional.

from streamalert.alert_processor.helpers import compose_alert

def _dispatch(self, alert, descriptor):
  # ...
  publication = compose_alert(alert, self, descriptor)
  make_api_call(misc_data=publication)
“Default” Implementations

For output-specific fields that are mandatory (such as an incident Title or assignee), each output should offer a default implementation:

def _dispatch(self, alert, descriptor):
  default_title = 'Incident Title: #{}'.format(alert.alert_id)
  default_html = '<html><body>Rule: {}</body></html>'.format(alert.rule_description)
  # ...
Custom Fields

Outputs can be implemented to offer custom fields that can be filled in by Publishers. This (optionally) grants fine-grained control of outputs to Publishers. Such fields should adhere to the following conventions:

  • They are top level keys on the final publication dictionary

  • Keys are strings, following the format: @{output_service}.{field_name}

  • Keys MUST begin with an at-sign

  • The output_service should match the current outputs cls.__service__ value

  • The field_name should describe its function

  • Example: @slack.attachments

Below is an example of how you could implement an output:

def _dispatch(self, alert, descriptor):
  # ...
  publication = compose_alert(alert, self, descriptor)

  default_title = 'Incident Title: #{}'.format(alert.alert_id)
  default_html = '<html><body>Rule: {}</body></html>'.format(alert.rule_description)

  title = publication.get('@pagerduty.title', default_title)
  body_html = publication.get('@pagerduty.body_html', default_html)

  make_api_call(title, body_html, data=publication)
Alert Fields

When outputs require mandatory fields that are not subject to publishers, they should reference the alert fields directly:

def _dispatch(self, alert, descriptor):
  rule_description = alert.rule_description
  # ...

Registering Publishers

Register publishers on a rule using the publisher argument on the @rule decorator:

from publishers import publisher_1, publisher_2
from streamalert.shared.rule import Rule

@rule(
  logs=['stuff'],
  outputs=['pagerduty', 'slack'],
  publishers=[publisher_1, publisher_2]
)
def my_rule(rec):
  # ...

The publishers argument is a structure containing references to Publishers and can follow any of the following structures:

Single Publisher
publishers=publisher_1

When using this syntax, the given publisher will be applied to all outputs.

List of Publishers
publishers=[publisher_1, publisher_2, publisher_3]

When using this syntax, all given publishers will be applied to all outputs.

Dict mapping Output strings to Publisher
publishers={
  'pagerduty:analyst': [publisher_1, publisher_2],
  'pagerduty': [publisher_3, publisher_4],
  'demisto': other_publisher,
}

When using this syntax, publishers under each key will be applied to their matching outputs. Publisher keys with generic outputs (e.g. pagerduty) are loaded first, before publisher keys that pertain to more specific outputs (e.g. pagerduty:analyst).

The order in which publishers are loaded will dictate the order in which they are executed.

DefaultPublisher

When the publishers argument is omitted from a @rule, a DefaultPublisher is loaded and used. This also occurs when the publishers are misconfigured.

The DefaultPublisher is reverse-compatible with old implementations of alert.output_dict().

Putting It All Together

Here’s a real-world example of how to effectively use Publishers and Outputs:

PagerDuty requires all Incidents be created with an Incident Summary, which appears at as the title of every incident in its UI. Additionally, you can optionally supply custom details which appear below as a large, unstructured body.

By default, the PagerDuty integration sends "StreamAlert Rule Triggered - rule_name" as the Incident Summary, along with the entire Alert record in the custom details.

However, the entire record can contain mostly irrelevant or redundant data, which can pollute the PagerDuty UI and make triage slower, as responders must filter through a large record to find the relevant pieces of information, this is especially true for alerts of very limited scope and well-understood remediation steps.

Consider an example where informational alerts are triggered upon login into a machine. Responders only care about the time of login, source IP address, and the username of the login.

You can implement a publisher that only returns those three fields and strips out the rest from the alert. The publisher can also simplify the PagerDuty title:

from streamalert.shared.publisher import Register

@Register
def simplify_pagerduty_output(alert, publication):
  return {
    '@pagerduty.record': {
        'source_ip': alert.record['source_ip'],
        'time': alert.record['timestamp'],
        'username': alert.record['user'],
    },
    '@pagerduty.summary': 'Machine SSH: {}'.format(alert.record['user']),
  }

Suppose this rule is being output to both PagerDuty and Slack, but you only wish to simplify the PagerDuty integration, leaving the Slack integration the same. Registering the publisher can be done as such:

from publishers.pagerduty import simplify_pagerduty_output
from streamalert.shared.rule import Rule

@rule(
  logs=['ssh'],
  outputs=['slack:engineering', 'pagerduty:engineering'],
  publishers={
    'pagerduty:engineering': simplify_pagerduty_output,
  }
)
def machine_ssh_login(rec):
  # ...

Lookup Tables

LookupTables is a framework for injecting additional data into StreamAlert Lambda functions. LookupTables offers a unified key-value interface into a set of backend storage solutions, allowing StreamAlert Lambda functions to use state from outside of the raw telemetry that they receive.

With LookupTables, StreamAlert can hydrate alerting data, add statefulness to alerts, scalable pull down remote data, rapidly tune rule logic, and much more!

How do LookupTables work?

LookupTables provides unified Python interface into backend data storage mechanisms. The two currently supported storage solutions are Amazon S3 and Amazon DynamoDB.

LookupTables makes these storage solutions available to StreamAlert’s Lambda functions. It is available on all classifiers, the rules engine, the alert merger, and the alert processor.

Usage

Pulling keys from LookupTables is very easy!

from streamalert.shared.lookup_tables.core import LookupTables

value = LookupTables.get('my-table', 'my-key', 'default-value')

The three arguments are as follows

  1. Table Name — The name of the LookupTable. This is specified in the config (below).

  2. Key — A key on the given LookupTable.

  3. Default Value — If the key is not found, this value will be returned instead. Notably, if the key is empty the empty value will be returned, NOT this default value.

Configuration

LookupTables is configured via a single file, conf/lookup_tables.json.

{
  "enabled": false,
  "tables": {
    "my-table-1": {
      "driver": "dynamodb",
      "table": "dynamodb-table-name",
      "partition_key": "partition-key",
      "value_key": "value-column",
      "cache_maximum_key_count": 3,
      "cache_refresh_minutes": 3,
      "consistent_read": false
    },
    "my-table-2": {
      "driver": "s3",
      "bucket": "s3-bucket-name",
      "key": "file.json",
      "compression": false,
      "cache_refresh_minutes": 10
    }
  }
}
  • enabled — (bool) Pass true to activate LookupTables. Leave false to disable.

  • tables — (dict) A dict mapping the name of a LookupTable to its corresponding configuration. The exact configuration varies from driver to driver. See below:

S3 Driver

This uses Amazon S3. It stores all LookupTables data into a single file in an S3 bucket, specified in the configuration:

{
  "driver": "s3",
  "bucket": "airbnb.sample.lookuptable",
  "key": "resource_map.json.gz",
  "compression": "gzip",
  "cache_refresh_minutes": 10
}
  • driver — (str) Use s3

  • bucket — (str) The name of the S3 bucket. It must be in the same AWS account. NOTE: Multiple S3 LookupTables can use the same bucket, as long as they reference different key’s.

  • key — (str) The S3 object key (aka filename) in the bucket.

  • compression — (str|bool) The compression algorithm of the S3 object. Currently only supports gzip. Pass false if the object is not compressed and is stored as JSON plaintext.

  • cache_refresh_minutes — (int) Number of minutes to cache the entire table. See the caching <#Caching> section below.

The Nitty Gritty

Because S3 driver stores all data in a single S3 file, it loads the entire table upfront. This is beneficial to StreamAlert code that scans multiple keys back-to-back, as only a single HTTP call will be made to S3, and subsequent calls will be made to the caching layer.

On the other hand, because the entire S3 file is loaded into memory, large S3 files can risk running into the memory ceiling of StreamAlert’s Lambda functions.

DynamoDB Driver

This driver uses DynamoDB as the storage layer. This driver stores individual keys as discrete rows on the DynamoDB table. The DynamoDB driver can be configured to respect both tables with a single partition key, as well as tables with both a partition and a sort key.

{
  "driver": "dynamodb",
  "table": "some_table_name",
  "partition_key": "MyPartitionKey",
  "sort_key": "MySortKey",
  "value_key": "MyValueKey",
  "consistent_read": false,
  "key_delimiter": ":",
  "cache_refresh_minutes": 2,
  "cache_maximum_key_count": 10
}
  • driver — (str) Use dynamodb

  • table — (str) The name of the DynamoDB table. This table must be on the same AWS region as the StreamAlert deployment.

  • partition_key — (str) The name of the partition key. The partition key MUST be a string type.

  • sort_key — (str) (Optional) The name of the sort key, if one exists. The sort key MUST be a string type.

  • value_key — (str) The name of the value column. NOTE: Multiple LookupTables can be “overlapped” on a single DynamoDB table, using different value_key’s.

  • consistent_read — (bool) (Optional) When true, it forces DynamoDB queries to be strongly consistent. This reduces performance, (potentially increasing HTTP latency during dynamo calls), but guarantees that modified values to LookupTables will be immediately available. Passing false allows eventually consistent reads, which can greatly improve performance.

  • key_delimiter — (str) (Optional) When accessing keys in a DynamoDB LookupTable that uses both a partition_key and a sort_key, the syntax of the final key is {partition_key}{delimiter}{sort_key}. The default delimiter is a colon (:), but this parameter can be provided to offer a different delimiter.

  • cache_refresh_minutes — (int) Number of minutes to cache each individual key.

  • cache_maximum_key_count — (int) Maximum number of keys to cache on this LookupTable. Once the cache is full, keys will be evicted on a random-selection basis.

The Nitty Gritty

The DynamoDB driver is designed to retrieve a minimal amount of data per request. This reduces the memory footprint compared to the S3 driver, and can reduce the Lambda memory limit required to prevent out-of-memory errors.

As a tradeoff, rapid back-to-back accesses of different keys will result in many HTTP calls being made to DynamoDB, which can slow down StreamAlert’s Lambda execution.

Caching

To reduce redundant requests to storage layers, LookupTables offers a simple in-memory caching layer. It can be configured using the cache_refresh_minutes configuration setting under each driver.

This will persist data retrieved from the storage solutions for a number of minutes in memory. This can increase Lambda memory consumption, but can also reduce runtime by reducing number of HTTP calls.

Putting Data Into LookupTables

It is not advisable (yet) for StreamAlert Lambdas to write values into LookupTables. It is generally advisable for external Lambdas (or other processes) to manage the data in LookupTables.

CLI Management

There is a StreamAlert CLI command for managing LookupTables, python manage.py lookup-tables, with three subcommands:

  • describe-tables

  • get

  • set

Use the -h flag to learn how to use them.

Best Practices

This section documents several best practices in no particular order.

Organize LookupTables Data

While LookupTables can support storage of whatever-data in whatever-table using whatever-key, for usage patterns that push scaling limits, it is generally advisable to organize data into tables that optimize for their access patterns.

It is advisable to split the data into many LookupTables, each containing data of similar access patterns.

When to use S3, and when to use Dynamo

Because it can condense the entire data fetching process into a single HTTP request, the S3 driver functions most optimally with small data sets that are often accessed together or interdependently. It is generally inadvisable to store massive amounts of data on a single S3 file.

S3 is ideal for “table scan” types of data. For example, long lists of IP addresses, whitelists, or dict mappings of hosts to metadata. S3 is also ideal for data that is often used together.

Caching Best Practices

Really, we haven’t found any reason to stress out about these values. Setting 5 minutes or 10 minutes is enough.

More effective is to use the DynamoDB driver with cache_maximum_key_count. This allows more fine-grained control of the maximum memory consumption of the cache.

Prefer Eventually Consistent Reads

We strongly recommend allowing eventually consistent reads on the DynamoDB driver. The public SLA for eventually consistent reads is 20 seconds, with a typical delay of less than 3 seconds.

Deployment

When LookupTables are configured properly, a subsequent run of python manage.py generate or python manage.py build will create a new file: terraform/lookup_tables.tf.json and build the appropriate IAM PERMISSIONS for the StreamAlert Lambdas to access them.

It will not build the actual S3 buckets or DynamoDB tables, however. Those resources have to be built elsewhere.

Usage Ideas

Whitelist

Instead of placing whitelists inline in code:

IP_WHITELIST = [
  '2.2.2.2',
  '8.8.8.8',
  '8.0.8.0',
]

Consider using LookupTables:

IP_WHITELIST = LookupTables.get('whitelists', 'ip_whitelist', [])
External Configuration

Suppose StreamAlert receive a piece of telemetry that includes a hostname:

{
  "hostname": "securityiscool.airbnb.com",
  ...
}

But suppose the rules logic requires an IP address instead. LookupTables can be used to retrieve realtime information about the DHCP or DNS information about that hostname, even if the IP address is not available in the original telemetry.

@rule(
  # ...
)
def my_rule(rec):
  hostname = get_key(rec, 'hostname')
  dns_metadata = LookupTables.get('dns_information', 'host:{}'.format(hostname), {})
  # rules logic here...

Apps

App Configuration

For StreamAlert and other related platforms, log forwarding is usually left as an exercise to the reader. This work is non-trivial and often requires new infrastructure and code. We wanted to make this easier for everyone and have achieved this through StreamAlert Apps.

Apps allow you to collect logs from popular services and applications in minutes. You simply provide the application’s credentials and StreamAlert will deploy an individual serverless application that will fetch and forward logs to StreamAlert for analysis and alerting.

Concepts

Apps are made possible through the use of AWS technologies:

Supported Services
  • Duo

    • Authentication Logs

    • Administrator Logs

  • OneLogin

    • Events Logs

  • G Suite Reports (Activities)

    • Admin

    • Calendar

    • Google Drive

    • Groups

    • Google Plus

    • Logins

    • Mobile Audit

    • Rules

    • SAML

    • Authorization Tokens

  • Box

    • Admin Events

  • Slack

    • Access Logs

    • Integrations Logs

  • Intercom

  • Admin Activity Logs

  • More to come

Getting Started

An initial deploy of StreamAlert must be performed before Apps can be configured. If you haven’t deployed StreamAlert yet, please visit the Getting Started page to get up and running.

Deploying an App only takes 3 steps:

  1. Configure the App through the CLI (via python manage.py app new).

  2. Enter the required authentication information.

  3. Deploy the new App and the Classifier.

To get help configuring a new App, use:

python manage.py app new --help
Configuring an App

The StreamAlert CLI is used to add a new App configuration.

python manage.py app new \
--type duo_auth \
--cluster prod \
--name duo_prod_collector \
--interval 'rate(2 hours)' \
--timeout 80 \
--memory 128

Flag

Description

--type

Type of app integration function being configured. Current choices are: duo_auth, duo_admin

--cluster

Applicable cluster this function should be configured against.

--name

Unique name to be assigned to the App. This is useful when configuring multiple accounts per service.

--interval

The interval, defined using a ‘rate’ expression, at which this app integration function should execute. See AWS Schedule Rate Expressions.

--timeout

The AWS Lambda function timeout value, in seconds. This should be an integer between 10 and 300.

--memory

The AWS Lambda function max memory value, in megabytes. This should be an integer between 128 and 1536.

Note

Duo Security’s Admin API is limited to two (2) requests per-minute. Therefore, setting the --timeout flag to any value between 10 and 60 will be of no additional value. A recommended timeout of 80 seconds will guarantee four (4) requests happen per-execution.

Enter the Authentication Info

The above command will result in a few prompts asking for the required authentication information needed to configure this App.

Note

After the last required authentication value is entered, the values are sent to AWS SSM’s Parameter Store as a SecureString to be used as part of this App’s config. Due to this requirement, please ensure you have the correct and valid AWS credentials loaded before continuing.

Example Prompts for Duo Auth
Please supply the API URL for your duosecurity instance. This should be in a format similar to 'api-abcdef12.duosecurity.com': api-abcdef12.duosecurity.com

Please supply the secret key for your duosecurity Admin API. This should be a string of 40 alphanumeric characters: 123424af2ae101d47d9704b783c940dffa825678

Please supply the integration key for your duosecurity Admin API. This should be in a format similar to 'DIABCDEFGHIJKLMN1234': DIABCDEFGHIJKLMN1234

Once the above is completed, a logger statement similar to the following will confirm the configuration:

StreamAlertCLI [INFO]: App authentication info successfully saved to parameter store.
StreamAlertCLI [INFO]: Successfully added 'duo_prod_collector' app integration to 'conf/clusters/prod.json' for service 'duo_auth'.

Your configuration file conf/clusters/<cluster>.json has now been updated and is ready to be deployed.

Deploy the Functions

The recommended process is to deploy both the apps function and the classifier processor function with:

python manage.py deploy --functions classifier apps
Authorizing the Slack App

The Slack endpoint API requires a bearer token, obtained by going through the slack oauth authentication process. Only one path through the process is supported by the Slack App: manually installing a custom integration.

To obtain the bearer token, an administrator of the Slack workspace must create a custom Slack app, add the admin permission scope to the custom app, and install the app to the target workspace.

Step by step:

  1. Visit the Create a Slack app page, and in the Create a Slack App dialog box fill in the App Name field with whatever you like and the select the target workspace from the Development Slack Workspace dropbdown box. Click Create App.

  2. On the Basic Information page of the app you just created, scroll to and click on OAuth & Permissions on the left hand sidebar.

  3. Scroll to the Scopes section, click on the dropdown box under Select Permission Scopes, and type admin to bring up the administrator scope (labeled Administer the workspace). Select it, then click Save changes.

  4. Scroll to the top of that same page and click on Install App to Workspace. Click Authorize on the next dialog. You should be returned to the OAuth & Permissions page.

  5. The bearer token is the string labeled with OAuth Access Token and beginning with xoxp-. Provide this when configuring the Slack StreamAlert app.

Enabling the Aliyun App

The Aliyun API requires an access key and access key secret for an authorized user.

To obtain the access key and access key secret, an authorized user of the Aliyun account should follow their directions to Create an Access Key.

Additionally, the user for whom the access key was created must have sufficient privileges to make use of ActionTrail; follow the directions on the Grant ActionTrail permissions to RAM users page.

How to set up the Intercom App

The Intercom API requires an access token. Get an access token by following these instructions.

To specify an API version, follow these instructions to do so through Intercom’s Developer Hub. The default will be the latest stable version. The Intercom app works on versions 1.2 or later.

Updating an App’s Credentials

You may need to change an App’s credentials due to internal rotation policies or otherwise. The StreamAlert CLI allows you to easily update App credentials. to aid in this process, the CLI also give you the ability to list currently configured Apps.

Listing Apps

To list currently configured Apps (grouped by cluster), use the CLI command:

python manage.py app list

Example output:

Cluster: prod

  Name: duo_prod_collector
    log_level:                     info
    interval:                      rate(2 hours)
    timeout:                       80
    memory:                        128
    type:                          duo_auth

Note

The output will show No Apps configured if you have not configured any Apps.

Updating Credentials

To update an App’s credentials, run the the following command:

python manage.py app update-auth --cluster <cluster> --name <app_name>

This will have you follow a process similar to configuring a new App.

Developing a New App

An App can be created to collect logs from virtually any RESTful API that supports HTTP GET requests.

Developing an App for a currently unsupported service is as easy as:

  1. Add a new file in streamalert/apps/_apps to correspond to the new service (eg: box.py).

  2. Create a subclass of the AppIntegration class found in streamalert/apps/app_base.py.

  3. Implement the required abstract properties and methods on the new subclass.

Example

This is a non-functional example of adding a new App for the Box Events API. This is to outline what methods from the base AppIntegration class must be implemented and what those methods must do.

# streamalert/apps/_apps/box.py
from . import AppIntegration, LOGGER, StreamAlertApp

@StreamAlertApp
class BoxApp(AppIntegration):
  """Box StreamAlert App"""

  _BOX_API_V2_EVENTS_ENDPOINT = 'https://api.box.com/2.0/events'
  _MAX_EVENTS_LIMIT = 500

  # Implement this abstractproperty
  @classmethod
  def service(cls):
    return 'box'

  # Implement this abstractproperty
  @classmethod
  def _type(cls):
    return 'admin_logs'

  # Implement this abstractmethod
  def required_auth_info(self):
    return {
        'secret_key':
            {
                'description': ('the secret key for this Box instance...'),
                'format': re.compile(r'...')
            },
        'client_id':
            {
                'description': ('the client_id for this Box instance...'),
                'format': re.compile(r'...')
            },
        'token':
            {
                'description': ('the token for this Box instance...'),
                'format': re.compile(r'...')
            }
        }

  # Implement this abstractmethod
  def _sleep_seconds(self):
    """Return the number of seconds this polling function should sleep for between requests

    Box imposes the following API limits: 10 API calls per second per user
    Box reference: https://developer.box.com/reference#rate-limiting

    Basically, this function should guarantee we sleep for 1 second every 10 requests

    Returns:
        int: Number of seconds that this function should sleep for between requests
    """
    return self._poll_count / 10 * 1

  # Implement this abstractmethod
  def _gather_logs(self):
    """Gather the Box event logs.

    This function should set a few things on the superclass:
      self._last_timestamp     # Set to the last timestamp/stream position from the logs
      self._more_to_poll       # Set to True if the max # of logs was polled this time


    Returns:
      list or bool: The list of logs fetched from the service, or False if
        there was an error during log collection.
    """
    headers = {'Authorization': 'Bearer {}'.format(self._get_oauth())}
    params = {'stream_position': self._last_timestamp,
              'limit': self._MAX_EVENTS_LIMIT,
              'stream_type': 'admin_logs'}

    # Make the request to the api, resulting in a bool or dict
    response = self._make_request(self._BOX_API_V2_EVENTS_ENDPOINT, headers=headers, params=params)
    if not response:
        return False

    logs = response['entries']

    # Set the last timestamp to the next stream position to be used in the next poll
    self._last_timestamp = response['next_stream_position']

    # Set self._more_to_poll to indicate there are more logs to collect
    self._more_to_poll = len(logs) >= self._MAX_EVENTS_LIMIT

    return logs

  def _get_oauth(self):
    """This should return the oauth token for this request"""
    secret_key = self._config.auth['secret_key']
    client_id = self._config.auth['client_id']
    token = self._config.auth['token']

    # Do something to generate oauth
    return generated_oauth

Metrics

StreamAlert allows to enable Enhanced Monitoring to surface infrastructure metrics at a granular level.

When enabled, access them by going to AWS Console -> CloudWatch -> Metrics -> Kinesis.

Enhanced metrics can be enabled in conf/global.json as shard_level_metrics, for example:

"shard_level_metrics": [
  "IncomingBytes",
  "IncomingRecords",
  "OutgoingBytes",
  "OutgoingRecords",
  "WriteProvisionedThroughputExceeded",
  "ReadProvisionedThroughputExceeded",
  "IteratorAgeMilliseconds"
]

These metrics can be viewed at the shard-level or the stream-level (cluster/environment).

All of CloudWatch’s features are at your disposal: graphing, dashboards, alerting, and more.

These metrics are useful for debugging, alerting on infrastructure metrics you care about, or for just getting a sense of the scale at which you’re analyzing and alerting on data.

Custom Metrics

By default, StreamAlert will log various custom metrics to AWS CloudWatch Metrics via Amazon CloudWatch Logs Metric Filters.

Amazon CloudWatch Logs Metric Filters utilize Filter Patterns to provide an extremely low-cost and highly scalable approach to tracking custom metrics.

The decision to use Metric Filters vs CloudWatch PutMetricData was made easy due to the limitations imposed by the PutMetricData API. StreamAlert has the potential to handle very high throughput, so the posting of metrics needed to be able to keep pace. Metric Filter Patterns leverage the runtime’s existing logger output in order to generate custom metrics that can be graphed, use for alarms, etc.

StreamAlert logs custom metrics on the cluster level and also on the aggregate (aka: global) level. For example, a metric for FailedParses will be created for each individual cluster that is configured. Along with publishing the metric to the respective cluster’s metrics, the metric will also get published to the aggregate value for this metric. Think of the aggregate as a summation of a metric across the entire StreamAlert deployment.

Custom metrics are logged to a unique StreamAlert namespace within CloudWatch Logs. Navigate to AWS Console -> CloudWatch -> Metrics -> StreamAlert to view these metrics.

Custom metrics definitions are found within streamalert/shared/metrics.py.

Classifier Custom Metrics
  • FailedParses

  • FirehoseFailedRecords

  • FirehoseRecordsSent

  • NormalizedRecords

  • S3DownloadTime

  • SQSFailedRecords

  • SQSRecordsSent

  • TotalProcessedSize

  • TotalRecords

  • TotalS3Records

  • TotalStreamAlertAppRecords

  • TriggeredAlerts

Rules Engine Custom Metrics
  • FailedDynamoWrites

  • TriggeredAlerts

Alert Merger Custom Metrics
  • AlertAttempts

Toggling Custom Metrics

Logging of custom metrics will be enabled by default for the Lambda functions that support this feature.

To globally (for all clusters) disable custom metrics for the classifier function:

python manage.py custom-metrics --disable --functions classifier

To disable custom metrics for the classifier function within specific cluster:

python manage.py custom-metrics --disable --functions classifier --clusters <CLUSTER>

Swap the --disable flag for --enable in the above commands to have the inverse affect.

Alarms for Custom Metrics

With the addition of custom metrics comes the added bonus of CloudWatch alarms for custom metrics. StreamAlert’s CLI can be used to add alarms on custom metrics as you see fit. Custom metric alarms can be applied to both aggregate metrics (across all clusters), or one or more cluster.

To get an up-to-date list of metrics to which alarms can be assigned on a cluster basis, run:

python manage.py create-cluster-alarm --help

To get an up-to-date list of metrics to which alarms can be assigned on an aggregate/global level, run:

python manage.py create-alarm --help

The required arguments for the create-alarm and create-cluster-alarm commands mimic what is required by AWS CloudWatch Metric’s PutMetricAlarm API.

Example: FailedParses, Cluster

FailedParses alarm at the prod cluster level

python manage.py create-cluster-alarm FailedParsesAlarm \
  --metric FailedParses \
  --metric-target cluster \
  --comparison-operator GreaterThanOrEqualToThreshold \
  --evaluation-periods 1 \
  --period 600 \
  --threshold 5.0 \
  --alarm-description 'Trigger this alarm if 5 or more failed parses occur within a 10 minute period in the cluster "prod"' \
  --clusters prod \
  --statistic Sum
Example: TotalRecords, Global

TotalRecords alarm on a global level

python manage.py create-alarm MinimumTotalRecordsAlarm \
  --metric TotalRecords \
  --metric-target aggregate \
  --comparison-operator LessThanThreshold \
  --evaluation-periods 3 \
  --period 600 \
  --threshold 200000 \
  --alarm-description 'Trigger this alarm if the total incoming records (aggregate) drops below 200000 for 3 consecutive 10 minute time periods in a row' \
  --statistic Sum

The custom metric alarms will notify StreamAlert’s default SNS topic for monitoring: <prefix>_streamalert_monitoring

Rule Staging

Rule Staging allows dynamic toggling of rules from a staging to production state, and vice versa. Staged rules will only send alerts for historical retention, and the alerts will not be sent to any user-defined outputs, such as Slack, PagerDuty, etc.

To ensure that new rules do not flood alerting outputs with a ton of potential false positives, rules can be staged. After the initial staging period, wherein a noisy rule is tuned to limit extra noise or false positives, staged rules can be promoted.

When Rule Staging is enabled, new rules will, by default, be staged upon a deploy of the Rules Engine Lambda function. See the Skip Staging During Deploy section for more information.

Enabling Rule Staging

By default, the rule staging feature is not enabled. It can be enabled with the following command:

python manage.py rule-staging enable --true

The change will be reflected in the conf/globals.json file.

For additional configuration options related to this feature, see the Rule Staging section of the Global Settings.

The initial implementation of the Rule Staging feature has a hard-coded ‘staging period’, or the time a rule should remain in staging before being auto-promoted to send to user-defined outputs. This is only relevant if the Rule Promotion feature is also enabled. The current default is 48 hours.

CLI Commands

There are a few CLI commands available to make management of staged rules easier.

Rule Status

The status of rules, meaning whether or not they are staged, can be determined with the following command:

python manage.py rule-staging status

Sample Output:

Rule                                     Staged?
  1: rule_001                            False
  2: rule_002                            True
  3: rule_003                            False
  4: rule_004                            False
Toggling Staged Status

Staging a rule, or list of rules, is possible with the following command:

python manage.py rule-staging stage <rule_001> <rule_002>

Unstaging rules, enabling them to send to all user-defined outputs, is equally as easy and accomplished with the following command:

python manage.py rule-staging unstage <rule_001> <rule_002>
Skip Staging During Deploy

As noted above, all new rules will be staged by default during a Rules Engine deploy when the Rule Staging feature is enabled. There may, however, be occasions when all new rules should not be staged during a deploy. To allow for this, the Rules Engine can be deployed with the following command:

python manage.py deploy --functions rule --skip-rule-staging

This will force all new rules to send to user-defined outputs immediately upon deploy, bypassing the default staging period. Alternatively, the --stage-rules and --unstage-rules flags can be used (instead of the --skip-rule-staging flag) to stage or unstage specific rules only.

Triaging Staged Rules

Once a rule is in staging, alerts generated by that rule can be queried in Athena:

SELECT 'rule_001' as rule_name, count(*) AS alert_count FROM alerts WHERE dt >= '2018-07-25-16' AND rule_name = 'rule_001' AND staged = True
Athena Results

rule_name

alert_count

rule_001

96

To help automate triaging of staged rules, StreamAlert includes an optional Rule Promotion Lambda function. This function can both send alert digests via email and auto-promote rules out of staging. See the Rule Promotion page for more detail.

Rule Promotion

To complement the Rule Staging feature, StreamAlert includes an optional Rule Promotion Lambda function. This Lambda function is invoked on a user-defined interval, and can automatically ‘promote’ rules out of staging.

Once rules are promoted, they will send alerts to all user-defined outputs. The function is also capable of sending digest emails to a Simple Notification Service (SNS) topic with statistics on how many alerts staged rules have generated.

Enabling Rule Promotion

Open the conf/lambda.json file, and find the rule_promotion_config section. Toggling the enabled flag to true will allow for deployment of the Rule Promotion Lambda function.

Example:

conf/lambda.json
{
  "rule_promotion_config": {
    "enabled": true,
    "log_level": "info",
    "log_retention_days": 14,
    "memory": 128,
    "schedule_expression": "rate(10 minutes)",
    "send_digest_schedule_expression": "cron(30 13 * * ? *)",
    "timeout": 120
  }
}

Configuration Options

A few additional configuration options are available to customize the function to your needs.

Key

Description

schedule_expression

How often the Rule Promotion Lambda function should be invoked in an attempt to promote currently staged rules. This should use AWS’s Rate or Cron Expression syntax.

send_digest_schedule_expression

When to invoke the Rule Promotion Lambda function to send the alert statistics digest. This should use AWS’s Rate or Cron Expression syntax.

Note

If either of the expressions used above are in the cron syntax, keep in mind the execution time will be relative to UTC, not local time.

The initial implementation of the Rule Promotion function has a hard-coded alert threshold, or the amount of alerts a rule can safely trigger and still be be auto-promoted to send to user-defined outputs.

The current default is 0, meaning any rule that is staged and triggers any alerts in the staging period (default of 48 hours hours) will not be auto-promoted. Manual promotion is possible via the command outlined in the Rule Staging CLI Commands section.

Deployment

Deploying the Rule Promotion Lambda function is similar to all of StreamAlert’s Lambda functions. The following command will create all of the necessary infrastructure and deploy the Rule Promotion function code.

python manage.py deploy --functions rule_promo

Note

After the above command is run and the function is deployed, users must subscribe to the SNS topic that is created in order to receive the alert statistics digest emails.

Alert Statistics Digest

The alert statistics digest that is sent to the SNS topic will contain information on staging times, as well as the amount of alerts the staged rule has generated to date. If alerts have been triggered, a link to Athena query results will also be included to assist in triaging them.

Sample Digest Email

Alert statistics for 2 staged rule(s) [2018-07-25 13:30:25.915131 UTC]

◦ rule_001
      - Staged At:                    2018-07-18 20:50:04.036690 UTC
      - Staged Until:                 2018-07-20 20:50:04.036690 UTC
      - Time Past Staging:            4d 16h 40m
      - Alert Count:                  20
      - Alert Info:                   https://console.aws.amazon.com/athena/home#query/history/0e86ea19-9449-4140-caaa-594b0979ed3d

◦ rule_002
      - Staged At:                    2018-07-23 22:30:38.823067 UTC
      - Staged Until:                 2018-07-25 22:30:38.823067 UTC
      - Remaining Stage Time:         0d 9h 0m
      - Alert Count:                  2
      - Alert Info:                   https://console.aws.amazon.com/athena/home#query/history/365853eb-ac8f-49b5-9118-c1c6479b2fbd

Scheduled Queries

Overview

Originally dubbed “StreamQuery”, this system allows you to execute Athena queries on a schedule, and funnel their results back into StreamAlert for rules analysis.

Because StreamAlert is mostly stateless, scheduled queries can allow you to correlate data together and analyze them automatically. Rules that were not previously possible can be written:

  • Detect X failed logins within Y minutes

  • Detect spike in API behavior that is correlated with an increase in # of a different alert/rule

  • Detect elevated API % error rates from specific IP address

How do scheduled queries work?

This system leverages two main components: AWS Lambda and AWS Step Functions.

First, a CloudWatch scheduled event triggers the execution of a new AWS Step Function State Machine. This State Machine manages the lifecycle of Athena queries through the Lambda function. Its sole responsibility is to execute the Lambda, wait a predefined window of time, and execute the Lambda again, repeating until the Lambda reports it is finished.

The Lambda function is a simple function that starts Athena queries, caches their execution ids, checks on their execution status, and uploads results to StreamAlert via Kinesis. Instead of doing all of these steps in a blocking fashion and sleeping while it waits for Athena, it runs through all queries in a single nonblocking pass, and returns the result of its execution to the State Machine. Once all queries have completed and their results sent to StreamAlert, the Lambda returns a “done” flag to the State Machine, signalling that this job has been finished.

Configuration

Scheduled Queries is configured via a single file, conf/scheduled_queries.json.

{
  "enabled": true,
  "config": {
    "destination_kinesis": "prefix_prod_streamalert",
    "sfn_timeout_secs": 3600,
    "sfn_wait_secs": 30
  },
  "packs": {
    "hourly": {
      "description": "Runs all hourly queries",
      "schedule_expression": "rate(1 hour)"
    },
    "two_hour": {
      "description": "Runs all queries run every two hours",
      "schedule_expression": "rate(2 hours)"
    },
    "daily": {
      "description": "Runs all daily queries",
      "schedule_expression": "rate(24 hours)"
    },
    "two_day": {
      "description": "Runs all queries that are run once every 2 days",
      "schedule_expression": "rate(2 days)"
    }
  },
  "lambda_config": {}
}
  • enabled — (bool) Pass true to activate ScheduledQueries. Leave false to disable.

  • config.destination_kinesis — (str) The name of the Kinesis stream to upload results to.

  • config.sfn_timeout_secs - (int) Max number of seconds for the state machine to execute.

  • config.sfn_wait_secs - (int) Time to wait between checks of query status.

  • query_packs - (dict) The keys of this dict are the names of the query packs. This section is discussed in more depth below.

Query Packs

Query Packs are batches of scheduled Athena queries that are executed together.

"query_packs": {
  ...
  "hourly": {
    "description": "Runs all hourly queries",
    "schedule_expression": "rate(1 hour)"
  },
  ...
}
  • description - (str) A string summary of what queries belong in this pack.

  • schedule_expression - (str) A CloudWatch schedule expression defining how frequently to execute this query pack.

Again, the keys to the query_packs dict are the names of the query packs. These names are used below.

Writing Queries

After you’ve defined a few Query Packs, it’s time to add actual scheduled queries.

All scheduled queries are located in the scheduled_queries/ directory, located in the root of the project.

from streamalert.scheduled_queries.query_packs.configuration import QueryPackConfiguration

QueryPackConfiguration(
    name='NAME_OF_QUERY_PACK',
    description='Hey, hey! This is a description!',

    # Make sure to edit the database name properly or this query will error with some
    # "insufficient privileges errors"
    query="""
SELECT
  *
FROM
  "ATHENA_DATABASE_NAME"."cloudwatch_cloudtrail"
WHERE
  dt = '{utcdatehour_minus1hour}'

  AND eventsource = 'athena.amazonaws.com'
  AND eventname = 'StartQueryExecution'
""",
    params=['utcdatehour_minus1hour'],
    tags=['sample']
)
  • name - (str) The name of this query. This name is published in the final result, and is useful when writing rules.

  • description - (str) Description of this query. This is published in the final result.

  • query - (str) A template SQL statement sent to Athena, with query parameters identified {like_this}.

  • params - (list[str]|dict[str,callable]) Read on below…

  • tags - (list[str]) Tags required by this query to be run. The simplest way to use this is to put the Query pack name into this array.

params

The “params” option specifies how to calculate special query parameters. It supports two formats.

The first format is a list of strings from a predefined set of strings. These have special values that are calculated at runtime, and are interpolated into the template SQL string. Here is a list of the supported strings:

The second format is a dictionary mapping parameter names to functions, like so:

def func1(date):
    return date.timestamp()

def func2(date):
    return LookupTables.get('aaaa', 'bbbb')

QueryPackConfiguration(
    ...
    query="""
SELECT *
FROM stuff
WHERE
  dt = '{my_param_1}'
  AND p2 = '{my_param_2}'
""",
    params={
        'my_param_1': func1,
        'my_param_2': func2,
    }
)

Writing Rules for StreamQuery

Classifier Schema

We provide an out-of-box sample schema for scheduled query v1.0.0 results. It is located at conf/schemas/streamquery.json.

What does a scheduled query result look like?

Below is an example of what StreamAlert may receive as a result from a scheduled query.

{
    "streamquery_schema_version": "1.0.0",
    "execution": {
        "name": "query_name_goes_here",
        "description": "This is an example",
        "query": "SELECT *\nFROM my_database.my_table\nWHERE dt = '2020-01-01-01' LIMIT 10",
        "query_parameters": {
            "dt": "2020-01-01-01"
        },
        "data_scanned_in_bytes": 4783293,
        "execution_time_ms": 12374,
        "tags": [ "query_pack_1" ],
        "query_execution_id": "123845ac-273b-ad3b-2812-9812739789",
        "console_link": "https://console.amazonaws.com/athena/somethingsomething",
    },
    "data": {
        "headers": [
            "username",
            "time"
        ],
        "rows": [
            {
                "username": "bob",
                "time": 1,
            },
            {
                "username": "sally",
                "time": 2,
            },
            {
                "username": "joe",
                "time": 3,
            },
        ],
        "count": 3,
    },
}

Because the data of each query may be different it is generally advisable to write a StreamAlert matcher on the execution.name value of the data, first. The rest is up to you!

Deployment

Deploying the various components of scheduled_queries is easy.

Building the Step Function, Lambda, and Query Packs

Anytime you change the configured query packs, you will need to run this to update the AWS Resources.

% ./manage.py built -t scheduled_queries
Deploying Python Code to Lambda
% ./manage.py deploy -f scheduled_queries

Best Practices

Use cron() instead of rate()

When defining schedule_expressions, it’s safer to use cron(1 * * * *) than rate(1 hour). The reason for this is, if you use Terraform to build or rebuild your scheduled queries resources, you may end up recreating the query pack. When using rate(1 hour), this will cause the CloudWatch event to immediately trigger, then wait 1 hour increments. With cron(1 * * * *), it is easier to determine exactly when a query pack will be executed. In this case: “1st minute of every hour”.

Be mindful of how much data is being sent

Athena queries can return a TON of data. Remember that this data has to fit in Lambda memory or it will crash your application. Try to structure your queries with GROUP BY statements or restrict the fields they operate on.

CAREFULLY CONSIDER Firehose’ing Scheduled Query results into Athena

It is theoretically possible to Firehose all StreamQuery results received by StreamAlert back into S3, using scheduled queries for data transformation.

We don’t really recommend doing this. This can add significantly more data to the pipeline, and usage of CREATE TABLE AS SELECT is likely a more cost efficient choice.

Use dt BETWEEN, not dt > Queries

In queries, prefer to be explicit about which partitions to scan. Use clauses like these:

  • dt = {datehour}

  • dt BETWEEN {datehour_minus1hour} AND {datehour}

Avoid things like dt > {datehour_minus1hour}. This creates time-sensitivity in your query, and may cause it to return different results than expected if there is a delay in Step Function execution (see below).

Neat Little Details

Athena Queries are Incredibly Cheap

At $5 per 1 Terabyte scanned, Athena is absurdly cheap. Go nuts with your scheduled queries!

Failed State Machine Executions are Retriable

AWS Step Functions record every single execution of each State Machine, as well as each state change. Going to the console, you can observe that the Input event of a State Machine execution is simply a JSON blob:

{
  "name": "streamalert_scheduled_queries_cloudwatch_trigger",
  "event_id": "12345678-53e7-b479-0601-1234567890",
  "source_arn": "arn:aws:events:us-east-1:123456789012:rule/myprefix_streamalert_scheduled_queries_event_0",
  "streamquery_configuration": {
    "clock": "2020-02-13T22:06:20Z",
    "tags": [
      "hourly"
    ]
  }
}

Notice the “clock”. This value is generated at the time the CloudWatch scheduled event is triggered. Thus, if you start a new State Machine execution using the exact same Input event (with the same clock), the results of that execution will be exactly (mostly…) the same.

This is useful for replaying failed State Machine executions that are resultant of Athena downtime, or deployed bugs. Simply use the AWS Console, navigate to any failed executions, and click the New Execution button, whereupon a form will be shown with a copy of the Input event already pre-populated!

You manually trigger query executions

Knowing the above, you can force StreamQuery to execute ad hoc queries simply by manually triggering State Machine executions, and passing in a correctly formatted Input event!

Make sure the Input event’s tags and clock are populated correctly to ensure the correct queries are executed.

Normalization

StreamAlert has an unannounced feature Data Normalization. In its current implementation, it extracts recognized field names from classified records, and saves them to a top-level key on the same record.

This is useful for rules, as they can be written to compare data fields against IoCs, such as IP Address, instead of writing one rule for each incoming data type. However, there are couple limitations we have identified as we use Normalization internally for a while.

Normalization 2.0 (Reboot)

In Normalization 2.0, we introduce a new lambda function Artifact Extractor by leveraging Amazon Kinesis Data Firehose Data Transformation feature to extract interesting artifacts from records processed by classifiers. The artifacts will be stored in the same S3 bucket where StreamAlert Historical Search feature uses and the Artifacts will be available for searching via Athena as well.

Artifacts Inventory

An artifact is any field or subset of data within a record that bears meaning beyond the record itself, and is of interest in computer security. For example, a “carbonblack_version” would not be an artifact, as it is meaningless outside of the context of Carbon Black data. However, an ip_address would be an artifact.

Artifact Extractor Lambda function will build an artifacts inventory based on S3 and Athena services. It enables users to search for all artifacts across whole infrastructure from a single Athena table.

Architecture
Normalization V2 Architecture

(click to enlarge)

Configuration

In Normalization v1, the normalized types are based on log source (e.g. osquery, cloudwatch, etc) and defined in conf/normalized_types.json file.

In Normalization v2, the normalized types will be based on log type (e.g. osquery:differential, cloudwatch:cloudtrail, cloudwatch:events, etc) and defined in conf/schemas/*.json. Please note, conf/normalized_types.json will is deprecated.

All normalized types are arbitrary, but only lower case alphabetic characters and underscores should be used for names in order to be compatible with Athena.

Supported normalization configure syntax:

"cloudwatch:events": {
  "schema": {
    "field1": "string",
    "field2": "string",
    "field3": "string"
  },
  "parser": "json",
  "configuration": {
    "normalization": {
      "normalized_key_name1": [
        {
          "path": ["path", "to", "original", "key"],
          "function": "The purpose of normalized_key_name1",
          "condition": {
            "path": ["path", "to", "other", "key"],
            "is|is_not|in|not_in|contains|not_contains": "string or a list"
          },
          "send_to_artifacts": true|false
        }
      ]
    }
  }
}
  • normalized_key_name1: An arbitrary string to name the normalized key, e.g. ip_address, hostname, command etc.

  • path: A list contains a json path to the original key which will be normalized.

  • function: Describe the purpose of the normalizer.

  • condition: An optional block that is executed first. If the condition is not met, then this normalizer is skipped.

    • path: A list contains a json path to the condition key.

    • is|is_not|in|not_in|contains|not_contains: Exactly one of these fields must be provided. This is the value that the conditional field that is compared against. E.g

      "condition": {
        "path": ["account"],
        "is": "123456"
      }
      
      "condition": {
        "path": ["detail", "userIdentity", "userName"],
        "in": ["root", "test_username"]
      }
      

      Note

      Use all lowercases string a list of strings in the conditional field. The value from the record will be converted to all lowercases.

  • send_to_artifacts: A boolean flag indicates should normalized information sent to artifacts table. This field is optional and it is default to true. It thinks all normalized information are artifacts unless set this flag to false explicitly.

Below are some example configurations for normalization v2.

  • Normalize all ip addresses (ip_address) and user identities (user_identity) for cloudwatch:events logs

    conf/schemas/cloudwatch.json

    "cloudwatch:events": {
      "schema": {
        "account": "string",
        "detail": {},
        "detail-type": "string",
        "id": "string",
        "region": "string",
        "resources": [],
        "source": "string",
        "time": "string",
        "version": "string"
      },
      "parser": "json",
      "configuration": {
        "normalization": {
          "ip_address": [
            {
              "path": [
                "detail",
                "sourceIPAddress"
              ],
              "function": "Source IP addresses"
            }
          ],
          "user_identity": [
            {
              "path": ["detail", "userIdentity", "type"],
              "function": "User identity type",
              "send_to_artifacts": false
            },
            {
              "path": ["detail", "userIdentity", "arn"],
              "function": "User identity arn"
            },
            {
              "path": ["detail", "userIdentity", "userName"],
              "function": "User identity username"
            }
          ]
        }
      }
    }
    
  • Normalize all commands (command) and file paths (file_path) for osquery:differential logs

    conf/schemas/osquery.json

    "osquery:differential": {
      "schema": {
        "action": "string",
        "calendarTime": "string",
        "columns": {},
        "counter": "integer",
        "decorations": {},
        "epoch": "integer",
        "hostIdentifier": "string",
        "log_type": "string",
        "name": "string",
        "unixTime": "integer",
        "logNumericsAsNumbers": "string",
        "numerics": "string"
      },
      "parser": "json",
      "configuration": {
        "optional_top_level_keys": [
          "counter",
          "decorations",
          "epoch",
          "log_type",
          "logNumericsAsNumbers",
          "numerics"
        ],
        "normalization": {
          "command": [
            {
              "path": ["columns", "command"],
              "function": "Command line from shell history"
            }
          ],
          "file_path": [
            {
              "path": ["columns", "history_file"],
              "function": "Shell history file path"
            }
          ]
        }
      }
    }
    
  • Normalize username (user_identity) for cloudwatch:events logs when certain condition is met. In the following example, it will only normalize username related to AWS accounts 11111111 and 22222222.

    conf/schemas/cloudwatch.json

    "cloudwatch:events": {
      "schema": {
        "account": "string",
        "detail": {},
        "detail-type": "string",
        "id": "string",
        "region": "string",
        "resources": [],
        "source": "string",
        "time": "string",
        "version": "string"
      },
      "parser": "json",
      "configuration": {
        "normalization": {
          "user_identity": [
            {
              "path": ["detail", "userIdentity", "userName"],
              "function": "User identity username",
              "condition": {
                "path": ["account"],
                "in": ["11111111", "22222222"]
              }
            }
          ]
        }
      }
    }
    
Deployment
  • Artifact Extractor will only work if Firehose and Historical Search are enabled in conf/global.json

    "infrastructure": {
      ...
      "firehose": {
        "use_prefix": true,
        "buffer_interval": 60,
        "buffer_size": 128,
        "enabled": true,
        "enabled_logs": {
          "cloudwatch": {},
          "osquery": {}
        }
      }
      ...
    }
    
  • Enable Artifact Extractor feature in conf/global.json

    "infrastructure": {
      "artifact_extractor": {
        "enabled": true,
        "firehose_buffer_size": 128,
        "firehose_buffer_interval": 900
      },
      "firehose": {
        "use_prefix": true,
        "buffer_interval": 60,
        "buffer_size": 128,
        "enabled": true,
        "enabled_logs": {
          "cloudwatch": {},
          "osquery": {}
        }
      }
      ...
    }
    
  • Artifact Extractor feature will add few more resources by running build CLI

    It will add following resources.

    • A new Glue catalog table artifacts for Historical Search via Athena

    • A new Firehose to deliver artifacts to S3 bucket

    • New permissions

    python manage.py build --target artifact_extractor
    
  • Then we can deploy classifier to enable Artifact Extractor feature.

    python manage.py deploy --function classifier
    

    Note

    If the normalization configuration has changed in conf/schemas/*.json, make sure to deploy the classifier Lambda function to take effect.

Custom Metrics

Add additional three custom metrics to Classifier for artifacts statistics.

  1. ExtractedArtifacts: Log the number of artifacts extracted from the records

  2. FirehoseFailedArtifats: Log the number of records (artifacts) failed sent to Firehose

  3. FirehoseArtifactsSent: Log the number of records (artifacts) sent to Firehose

By default, the custom metrics should be enabled in the Classifier, for example in conf/clusters/prod.json

{
  "id": "prod",
  "classifier_config": {
    "enable_custom_metrics": true,
    ...
  }
}
python manage.py build --target "metric_filters_*"
Artifacts
  1. Artifacts will be searchable within the Athena artifacts table while original logs are still searchable within dedicated table.

Search cloudwatch:events logs:

SELECT *
FROM PREFIX_streamalert.cloudwatch_events
WHERE dt='2020-06-22-23'
Testing Results from cloudwatch_events Table

(click to enlarge)

  1. All artifacts, including artifacts extracted from cloudwatch:events, will live in artifacts table.

SELECT *
FROM PREFIX_streamalert.artifacts
WHERE dt='2020-06-22-23'
Artifacts from artifacts Table

(click to enlarge)

  1. (Advanced) Use join search to find original record associated to the artifacts by streamalert_record_id

SELECT artifacts.*,
         cloudwatch.*
FROM
    (SELECT streamalert_record_id AS record_id,
         type,
         value
    FROM PREFIX_streamalert.artifacts
    WHERE dt ='2020-06-22-23'
            AND type='user_identity'
            AND LOWER(value)='root' LIMIT 10) AS artifacts
LEFT JOIN
    (SELECT streamalert_normalization['streamalert_record_id'] AS record_id, detail
    FROM PREFIX_streamalert.cloudwatch_events
    WHERE dt ='2020-06-22-23' LIMIT 10) AS cloudwatch
    ON artifacts.record_id = cloudwatch.record_id
JOIN Search Result

(click to enlarge)

Note

Instead issue two searches, we can use JOIN statement to search once across two tables to find the original record(s) associated with the interesting artifacts. This requires streamalert_normalization field where contains streamalert_record_id searchable in the original table. Current process is add streamalert_normalization field as a top level optional key to the schema.

  • Update schema conf/schemas/cloudwatch.json

    "cloudwatch:events": {
      "schema": {
        "account": "string",
        "detail": {},
        "detail-type": "string",
        "id": "string",
        "region": "string",
        "resources": [],
        "source": "string",
        "streamalert_normalization": {},
        "time": "string",
        "version": "string"
      },
      "parser": "json",
      "configuration": {
        "optional_top_level_keys": [
          "streamalert_normalization"
        ],
        "normalization": {
          "user_identity": [
            {
              "path": ["detail", "userIdentity", "type"],
              "function": "User identity type"
            },
            {
              "path": ["detail", "userIdentity", "arn"],
              "function": "User identity arn"
            },
            {
              "path": ["detail", "userIdentity", "userName"],
              "function": "User identity username"
            }
          ]
        }
      }
    }
    
  • Apply the change by running

    python manage.py build --target "kinesis_firehose_*"
    

Considerations

The Normalization Reboot will bring us good value in terms of how easy will be to search for artifacts across entire infrastructure in the organization. It will also make it possible to write more efficient scheduled queries to have correlated alerting in place. But, it is worth to mention that there may have some tradeoffs on requiring additional resources, adding additional data delay.

  1. Increase in Data Footprint: Each individual original record has the chance to add many artifacts. In practice, this will likely not be a huge issue as each artifact is very small and only contains few fields.

  2. Additional Delay: Firehose data transformation will add additional up to 900 seconds of delay on the data available for historical search. 900 seconds is a configurable setting on the Firehose where the artifacts extracted from. Reduce the firehose buffer_interval value if want to reduce delay.

  3. High memory usage: Artifact Extractor Lambda function may need at least 3x max(buffer size of firehoses where the artifacts extracted from). Because we are doing lots of data copy in Artifact Extractor lambda function. This may be improved by writing more efficient code in the Artifact Extractor Lambda function..

Example Schemas

For additional background on schemas, see Schemas

JSON

CloudWatch
Example Log
{
  "version": "0",
  "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
  "detail-type": "EC2 Instance State-change Notification",
  "source": "aws.ec2",
  "account": "111122223333",
  "time": "2015-12-22T18:43:48Z",
  "region": "us-east-1",
  "resources": [
    "arn:aws:ec2:us-east-1:123456789012:instance/i-12345678"
  ],
  "detail": {
    "instance-id": "i-12345678",
    "state": "terminated"
  }
}
Schema
{
  "cloudwatch:ec2_event": {
    "schema": {
      "version": "string",
      "id": "string",
      "detail-type": "string",
      "source": "string",
      "account": "integer",
      "time": "string",
      "region": "string",
      "resources": [],
      "detail": {
        "instance-id": "string",
        "state": "string"
      }
    },
    "parser": "json"
  }
}
Inspec
Example Log
{
  "version": "1.4.1",
  "profiles": [
    {
      "supports": [],
      "controls": [
        {
          "title": null,
          "desc": null,
          "impact": 0.5,
          "refs": [],
          "tags": {},
          "code": "code-snip",
          "source_location": {
            "ref": "/lib/inspec/control_eval_context.rb",
            "line": 87
          },
          "id": "(generated from osquery.rb:6 de0aa7d2405c27dfaf34a56e2aa67842)",
          "results": [
            {
              "status": "passed",
              "code_desc": "File /var/osquery/osquery.conf should be file",
              "run_time": 0.001332,
              "start_time": "2017-01-01 00:00:00 -0700"
            }
          ]
        }
      ],
      "groups": [
        {
          "title": null,
          "controls": [
            "(generated from osquery.rb:1 813971f93b6f1a66e85f6541d49bbba5)",
            "(generated from osquery.rb:6 de0aa7d2405c27dfaf34a56e2aa67842)"
          ],
          "id": "osquery.rb"
        }
      ],
      "attributes": []
    }
  ],
  "other_checks": [],
  "statistics": {
    "duration": 0.041876
  }
}
Schema
{
  "inspec": {
    "schema": {
      "title": "string",
      "desc": "string",
      "impact": "float",
      "refs": [],
      "tags": {},
      "code": "string",
      "id": "string",
      "source_location": {
        "ref": "string",
        "line": "integer"
      },
      "results": []
    },
    "parser": "json",
    "configuration": {
      "json_path": "profiles[*].controls[*]"
    }
  }
}
Box.com
Example Log
{
  "source": {
    "item_type": "file",
    "item_id": "111111111111",
    "item_name": "my-file.pdf",
    "parent": {
      "type": "folder",
      "name": "Files",
      "id": "22222222222"
    }
  },
  "created_by": {
    "type": "user",
    "id": "111111111",
    "name": "User Name",
    "login": "user.name@domain.com"
  },
  "created_at": "2017-01-01T00:00:00-07:00",
  "event_id": "111ccc11-7777-4444-aaaa-dddddddddddddd",
  "event_type": "EDIT",
  "ip_address": "127.0.0.1",
  "type": "event",
  "session_id": null,
  "additional_details": {
    "shared_link_id": "sadfjaksfd981348fkdqwjwelasd9f8",
    "size": 14212335,
    "ekm_id": "111ccc11-7777-4444-aaaa-dddddddddd",
    "version_id": "111111111111",
    "service_id": "5555",
    "service_name": "Box Sync for Mac"
  }
}
Schema
{
  "box": {
    "schema": {
      "source": {
        "item_type": "string",
        "item_id": "integer",
        "item_name": "string",
        "parent": {
          "type": "string",
          "name": "string",
          "id": "integer"
        }
      },
      "created_by": {
        "type": "string",
        "id": "integer",
        "name": "string",
        "login": "string"
      },
      "created_at": "string",
      "event_id": "string",
      "event_type": "string",
      "ip_address": "string",
      "type": "string",
      "session_id": "string",
      "additional_details": {}
    },
    "parser": "json"
  }
}
CloudWatch VPC Flow Logs

AWS VPC Flow Logs can be delivered to StreamAlert via CloudWatch.

CloudWatch logs are delivered as a nested record, so we will need to pass configuration options to the parser to find the nested records:

Schema
{
  "cloudwatch:flow_logs": {
    "schema": {
      "protocol": "integer",
      "source": "string",
      "destination": "string",
      "srcport": "integer",
      "destport": "integer",
      "action": "string",
      "packets": "integer",
      "bytes": "integer",
      "windowstart": "integer",
      "windowend": "integer",
      "version": "integer",
      "eni": "string",
      "account": "integer",
      "flowlogstatus": "string"
    },
    "parser": "json",
    "configuration": {
      "json_path": "logEvents[*].extractedFields",
      "envelope_keys": {
        "logGroup": "string",
        "logStream": "string",
        "owner": "integer"
      }
    }
  }
}
osquery

Osquery’s schema changes depending on the SELECT statement used and the table queried. There are several options when writing schemas for these logs.

Schema, Option #1

Define a schema for each table used:

{
  "osquery:etc_hosts": {
    "parser": "json",
    "schema": {
      "name": "string",
      "columns": {
        "address": "string",
        "hostnames": "string"
      },
      "action": "string",
      "field...": "type..."
    }
  },
  "osquery:listening_ports": {
    "parser": "json",
    "schema": {
      "name": "string",
      "columns": {
        "pid": "integer",
        "port": "integer",
        "protocol": "integer",
        "field...": "type..."
      },
      "action": "string",
      "field...": "type..."
    }
  }
}

This approach promotes Rule safety, but requires additional time to define the schemas.

Schema, Option #2

Define a “loose” schema which captures arbitrary values for a given field:

{
  "osquery:differential": {
    "parser": "json",
    "schema": {
      "name": "string",
      "hostIdentifier": "string",
      "calendarTime": "string",
      "unixTime": "integer",
      "columns": {},
      "action": "string"
    }
  }
}

Note

The value for columns above of {} indicates that a map with any key/value pairs is acceptable.

Warning

In Option 2, the schema definition is flexible, but Rule safety is lost because you will need to use defensive programming when accessing and analyzing fields in columns. The use of req_subkeys will be advised in this case, see Rules for additional details.

CSV

See CSV Parsing

Key-Value (KV)

auditd
Example Log
type=SYSCALL msg=audit(1364481363.243:24287): arch=c000003e syscall=2 success=no exit=-13
a0=7fffd19c5592 a1=0 a2=7fffd19c4b50 a3=a items=1 ppid=2686 pid=3538 auid=500 uid=500
gid=500 euid=500 suid=500 fsuid=500 egid=500 sgid=500 fsgid=500 tty=pts0 ses=1 comm="cat"
exe="/bin/cat" subj=unconfined_u:unconfined_r:unconfined_t:s0-s0:c0.c1023 key="sshd_config"
type=CWD msg=audit(1364481363.243:24287):  cwd="/home/shadowman" type=PATH msg=audit(1364481363.243:24287):
item=0 name="/etc/ssh/sshd_config" inode=409248 dev=fd:00 mode=0100600 ouid=0 ogid=0
rdev=00:00 obj=system_u:object_r:etc_t:s0
Schema
{
  "example_auditd": {
    "parser": "kv",
    "schema": {
      "type": "string",
      "msg": "string",
      "arch": "string",
      "syscall": "string",
      "success": "string",
      "exit": "string",
      "a0": "string",
      "a1": "string",
      "a2": "string",
      "a3": "string",
      "items": "string",
      "ppid": "integer",
      "pid": "integer",
      "auid": "integer",
      "uid": "integer",
      "gid": "integer",
      "euid": "integer",
      "suid": "integer",
      "fsuid": "integer",
      "egid": "integer",
      "sgid": "integer",
      "fsgid": "integer",
      "tty": "string",
      "ses": "string",
      "comm": "string",
      "exe": "string",
      "subj": "string",
      "key": "string",
      "type_2": "string",
      "msg_2": "string",
      "cwd": "string",
      "type_3": "string",
      "msg_3": "string",
      "item": "string",
      "name": "string",
      "inode": "string",
      "dev": "string",
      "mode": "integer",
      "ouid": "integer",
      "ogid": "integer",
      "rdev": "string",
      "obj": "string"
    },
    "configuration": {
      "delimiter": " ",
      "separator": "="
    }
  }
}

Note

The value for parser above should be set to kv for key-value parsing. The delimiter and separator keys within configuration indicate the values to use for delimiter and field separator, respectively.

Syslog

See Syslog Parsing

Troubleshooting

Kinesis Data Streams

As detailed in other sections, StreamAlert utilizes Amazon Kinesis Data Streams.

Review Kinesis Streams key concepts

ThroughputExceeded

Pertains to WriteProvisionedThroughputExceeded or ProvisionedThroughputExceededException

The documentation above states:

  • “Each shard can support up to 1,000 records per second for writes, up to a maximum total data write rate of 1 MB per second (including partition keys)”

  • “Each PutRecords request can support up to 500 records. Each record in the request can be as large as 1 MB, up to a limit of 5 MB for the entire request, including partition keys”

If you’re experiencing either error, one of the following holds true:

  1. You are exceeding 1000 records/s write on at least one shard

  2. You are exceeding 1MB/s on at least one shard

  3. You sent > 500 records in a single PutRecords request

  4. You sent a record > 1MB

  5. You are sending > 5MB in a PutRecords request to at least one shard

(1),(2),(5) can be addressed by allocating more shards or using a partition key

(3) and (4) can be addressed by using code or agents with the proper limit checks

How you setup your partition keys depends on your use-cases, scale and how you’re sending your data.

In our experience, there are three common use-cases:

  • No partition key (small scale)

  • Per-batch partition key (medium scale)

  • Per-record partition key (larger scale)

Explanation: A PutRecordsBatch request can have up to 500 records amounting to a total of 5MB. If you’re doing a per-batch partition key, that means you’re attempting to send up to 5MB to a single shard that has a limit of 1MB/s. Keep in mind: if your code/agent uses splay or has reasonable retry logic, an error or exception does not imply data loss and may still be a viable strategy.

StreamAlert enables AWS Enhanced Monitoring to help you diagnose these types of issues via shard-level metrics. Simply go to CloudWatch -> Metrics -> Kinesis. This also allows you to measure IncomingBytes and IncomingRecords.

DescribeStream: Rate exceeded

Or DescribeDeliveryStream: Rate exceeded.

This API call is limited to 10 requests/s. Your agent/code should not be using this API call to determine if the Kinesis Stream is available to receive data. The agent/code should simply attempt to send the data and gracefully handle any exceptions.

certificate verify failed

Run the following command on the impacted host, choosing the correct region: openssl s_client -showcerts -connect kinesis.us-west-2.amazonaws.com:443

If this returns Verify return code: 0 (ok), your agent/code needs to use Amazon’s root and/or intermediate certificates (PEM) for TLS to function properly

FAQ

Frequently Asked Questions

What language is StreamAlert written in?

What license is StreamAlert released under?

How much does StreamAlert cost

  • StreamAlert is open source (free)

What/How can I send data to StreamAlert?

What scale does StreamAlert operate at?

  • StreamAlert utilizes Kinesis Streams, which can “continuously capture and store terabytes of data per hour from hundreds of thousands of sources” [1]

What’s the maintenance/operational overhead?

  • Limited; StreamAlert utilizes Terraform, Kinesis Streams and AWS Lambda, which means you don’t have to manually provision, manage, patch or harden any servers

Does StreamAlert support analytics, metrics or time series use-cases?

  • StreamAlert itself does not support analytics, metrics or time series use-cases. StreamAlert can send data to such tools or you can use one of many great open source and commercial offerings in this space, including but not limited to Prometheus, DataDog and NewRelic.

Is StreamAlert intended for synchronous (blocking) or asynchronous decision making?

  • StreamAlert is intended for asynchronous decision making.

Contact Us

Don’t see your question answered here?

Feel free to open an issue, submit a PR, and/or reach out to us on Slack

Alternatives

It should be noted that the correct choice depends on your use-cases, existing infrastructure, security requirements, available resources, core competencies and more. Details outlined below were considered notable differences and shouldn’t constitute a complete, detailed comparison.

ElastAlert

Infrastructure

ElastAlert assumes you have an existing Elasticsearch cluster; it schedules queries against it

StreamAlert directly ingests data from S3 buckets or other sources like Fluentd, Logstash, Kinesis Agent, osquery, PHP, Java, Ruby, etc via Amazon Kinesis Data Streams

Rules/Queries

ElastAlert uses YAML files and Elasticsearch’s Query DSL. It supports query types that StreamAlert currently does not, ex: Change, Frequency, Spike, Flatline, New Term, Cardinality

StreamAlert uses JSON files and queries are written in Python; they can utilize any Python libraries or functions

Security

In ElastAlert, TLS and authentication is optional (Elasticsearch). This can be enabled via Elastic Shield/X-Pack.

StreamAlert requires TLS for data transport (Kinesis requirement) and authentication is required (AWS Identity and Access Management (IAM))

Etsy’s 411

Infrastructure

411 assumes you have an existing Elasticsearch cluster; it schedules queries against it

StreamAlert directly ingests data from S3 buckets or other sources like fluentd, logstash, kinesis-agent, osquery, PHP, Java, Ruby, etc via Amazon Kinesis Data Streams

Rules/Queries

411 uses a custom query language called ESQuery, “Pipelined Lucene shorthand”, which is then translated to Elasticsearch’s Query DSL

StreamAlert rules/queries are written in Python; they can utilize any Python libraries or functions.

Security

411:

  • Infrastructure: Apache (w/mod_rewrite, mod_headers), PHP, SQLite, & MySQL. You are responsible for hardening and vulnerability management of these applications and the underlying host / operating system.

  • AuthN/AuthZ: The UI is accessed via username/password over TLS. TLS and authentication is optional for Elasticsearch; it can be enabled via Elastic Shield/X-Pack

StreamAlert:

  • Infrastructure: Serverless; underlying operating system is hardened and updated by Amazon. Application is Python and runs in a short-lived container/sandbox.

  • Requires TLS for data transport (Kinesis requirement)

  • AuthN/AuthZ is required (AWS Identity and Access Management (IAM))