
Overview¶
StreamAlert is a serverless, real-time data analysis framework which empowers you to ingest, analyze, and alert on data from any environment, using data sources and alerting logic you define. Computer security teams use StreamAlert to scan terabytes of log data every day for incident detection and response.
Incoming log data will be classified and processed by the rules engine. Alerts are then sent to one or more outputs.

(click to enlarge)¶
Features¶
Rules are written in Python; they can utilize any Python libraries or functions
Ingested logs and generated alerts can be retroactively searched for compliance and research
Serverless design is cheaper, easier to maintain, and scales to terabytes per day
Deployment is automated: simple, safe and repeatable for any AWS account
Secure by design: least-privilege execution, containerized analysis, and encrypted data storage
Merge similar alerts and automatically promote new rules if they are not too noisy
Built-in support for dozens of log types and schemas
Built-in collection of broadly applicable community rules
Fully open source and customizable: add your own log schemas, rules, and alert outputs
Ready? Let’s get started!
Resources¶
Table of Contents¶
Getting Started¶
It only takes a few minutes to get StreamAlert up and running! These instructions have been tested on MacOS, but should also work on most linux systems.
Install Dependencies¶
brew install terraform # MacOS Homebrew
terraform --version # Must be >= v0.13.0
Note
Terraform versions lower than 0.13 are not supported. We recommend installing Terraform version 0.13.0 or greater.
If you are using Linux, you may need to install the Python development libraries:
sudo apt install python-dev # Debian
sudo yum install python-devel # CentOS/RHEL
Download StreamAlert¶
Clone the latest stable release of StreamAlert:
git clone --branch stable https://github.com/airbnb/streamalert.git
Create and activate a virtual environment:
cd streamalert
python3.7 -m venv venv
source venv/bin/activate
Install the StreamAlert requirements:
pip install -r requirements.txt
Run unit tests to make sure everything is installed correctly:
tests/scripts/unit_tests.sh
Configure AWS Credentials¶
1. Create an AWS account and an IAM user with permissions for at least the following services:
Athena
CloudTrail
CloudWatch Events and Logs
DynamoDB
Glue
IAM
Kinesis Firehose and Streams
KMS
Lambda
S3
SNS
SQS
Configure your AWS credentials
pip install --user awscli
aws configure
Deploy¶
Note
StreamAlert supports Terraform’s native ability to lock the remote s3 state file whenever a user is planning and applying Terraform configuration. This is to prevent multiple users from deploying StreamAlert at the same time potentially resulting in a broken state. StreamAlert will automatically create and destroy this table via the command line interface. See Terraform’s documentation for more information.
Set basic StreamAlert configuration options:
python manage.py configure aws_account_id 111111111111 # Replace with your 12-digit AWS account ID
python manage.py configure prefix <value> # Choose a unique name prefix (alphanumeric characters only)
Note
Update the
file_format
value inconf/lambda.json
. Valid options areparquet
orjson
. The default value will be parquet in a future release, but this must be manually configured at this time.
"athena_partitioner_config": {
"concurrency_limit": 10,
"file_format": "parquet",
"log_level": "info"
}
More information can be found on the historical search page.
Build the StreamAlert infrastructure for the first time:
python manage.py init
There will be multiple Terraform prompts, type “yes” at each one to continue.
Note
You only need to python manage.py init
once for any given StreamAlert deployment,
although it is safe to run again if necessary.
3. At this point, StreamAlert is up and running! You can, for example, see the S3 buckets that were automatically created:
aws s3 ls | grep streamalert
You can also login to the AWS web console and see StreamAlert’s CloudWatch logs, Lambda functions, etc.
Live Test¶
Now let’s upload some data and trigger an alert to see StreamAlert in action! This example uses SNS for both sending the log data and receiving the alert, but StreamAlert also supports many other data sources and alert outputs.
Create 2 SNS topics:
aws sns create-topic --name streamalert-test-data
aws sns create-topic --name streamalert-test-alerts
Export some environment variables for easy re-use later:
export SA_REGION=us-east-1 # StreamAlert deployment region
export SA_ACCOUNT=111111111111 # AWS account ID
export SA_EMAIL=email@domain.com # Email to receive an SNS notification
Subscribe your email to the alerts SNS topic:
aws sns subscribe --topic-arn arn:aws:sns:$SA_REGION:$SA_ACCOUNT:streamalert-test-alerts \
--protocol email --notification-endpoint $SA_EMAIL
Note
You will need to click the verification link in your email to activate the subscription.
4. Add the streamalert-test-data
SNS topic as an input to the (default) prod
cluster.
Open conf/clusters/prod.json
and change the streamalert
module to look like this:
{
"classifier_config": {
"enable_custom_metrics": true,
"inputs": {
"aws-sns": [
"arn:aws:sns:REGION:ACCOUNTID:streamalert-test-data"
]
},
"log_level": "info",
"log_retention_days": 14,
"memory": 128,
"timeout": 60
}
}
5. Tell StreamAlert which log schemas will be sent to this input.
Open conf/clusters/prod.json
and change the data_sources
section to look like this:
{
"data_sources": {
"sns": {
"streamalert-test-data": [
"cloudwatch"
]
}
}
}
Add the alert topic as a StreamAlert output:
python manage.py output set aws-sns
Please supply a short and unique descriptor for this SNS topic: test-email
Please supply SNS topic name: streamalert-test-alerts
If you look at conf/outputs.json
, you’ll notice that the SNS topic was automatically added.
7. Configure a rule to send to the alerts topic.
We will use rules/community/cloudwatch_events/cloudtrail_root_account_usage.py
as an example, which
alerts on any usage of the root AWS account. Change the rule decorator to:
@rule(
logs=['cloudwatch:events'],
req_subkeys={'detail': ['userIdentity', 'eventType']},
outputs=['aws-sns:test-email'] # Add this line
)
def cloudtrail_root_account_usage(rec):
Now we need to update StreamAlert with these changes:
# Hook the streamalert-test-data SNS topic up to the StreamAlert Classifier function
python manage.py build
# Deploy a new version of all of the Lambda functions with the updated rule and config files
python manage.py deploy
Note
Use build
and deploy
to apply any changes to StreamAlert’s
configuration or Lambda functions, respectively. Some changes (like this example) require both.
Time to test! Create a file named
cloudtrail-root.json
with the following contents:
{
"account": "1234",
"detail": {
"eventType": "AwsConsoleSignIn",
"userIdentity": {
"type": "Root"
}
},
"detail-type": "CloudTrail Test",
"id": "1234",
"region": "us-east-1",
"resources": [],
"source": "1.1.1.2",
"time": "now",
"version": "2018"
}
This is only a rough approximation of what the real log might look like, but good enough for our purposes. Then send it off to the data SNS topic:
aws sns publish --topic-arn arn:aws:sns:$SA_REGION:$SA_ACCOUNT:streamalert-test-data \
--message "$(cat cloudtrail-root.json)"
If all goes well, an alert should arrive in your inbox within a few minutes! If not, look for any errors in the CloudWatch Logs for the StreamAlert Lambda functions.
10. After 10 minutes (the default refresh interval), the alert will also be searchable from
Amazon Athena. Select your StreamAlert database in the
dropdown on the left and preview the alerts
table:
(Here, my name prefix is testv2
.) If no records are returned, look for errors
in the Athena Partitioner function or try invoking it directly.
And there you have it! Ingested log data is parsed, classified, and scanned by the rules engine. Any resulting alerts are delivered to your configured output(s) within a matter of minutes.
Architecture¶
StreamAlert consists of multiple AWS components connected together and managed by Terraform.

(click to enlarge)¶
Data Lifecycle¶
1. Log data can come through any of the supported data sources. This includes Kinesis, S3, SNS, or using a StreamAlert App to periodically poll data from a third-party API.
2. Inbound logs are first ingested via the “classifier” Lambda function in one of your clusters. The classifier is the first and most substantial component of StreamAlert, responsible for parsing, classifying, and performing normalization on logs. The classifier(s) can also optionally forward the resulting logs from classification to Firehose for historical retention and searching via Athena. The same results also get sent to an SQS Queue for further downstream analysis.
3. The SQS Queue that the classifier function(s) send data to is then utilized by a “rules engine” Lambda function. Note that this function is not “clustered” like the classifier(s) and is global to the StreamAlert deployment. The data read from the queue will be ran through the defined rules, and any alerts that are triggered will be sent to a DynamoDB table. Optionally, the rules engine function can read Threat Intelligence information from DynamoDB, or perform other lookup-style operations using data stored in S3.
4. The “alert merger” Lambda function regularly scans the alerts DynamoDB table. When new alerts arrive, they are either forwarded immediately (by default) or, if merge options are specified, they are bundled together with similar alerts before proceeding to the next stage.
5. The “alert processor” Lambda function is responsible for actually delivering the alert to its configured outputs. All alerts implicitly include a Firehose output, which feeds an S3 bucket that can be queried with Athena. Alerts will be retried indefinitely until they are successfully delivered, at which point they will be removed from the DynamoDB table.
6. An Athena Partitioner Lambda function runs periodically to onboard new StreamAlert data and alerts into their respective Athena databases for historical search.
Other StreamAlert components include DynamoDB tables and Lambda functions for optional rule promotion and regularly updating threat intelligence information.
Datatypes¶
StreamAlert supports the following datatypes:
JSON¶
{"type": "json"}
CSV¶
csv,data,example
Key-Value¶
type=kv data=example
Syslog¶
Jun 15 00:00:40 host1.mydomain.io application[pid] syslog message.
Datasources¶
StreamAlert supports the following services as datasources:
Amazon S3
Amazon Kinesis Data Streams
Amazon Simple Notification Service (SNS)
These services above can accept data from:
Log Forwarding Agents
Custom Applications
AWS CloudTrail
Amazon CloudWatch Events
And more
To configure datasources for a cluster, read datasource configuration
Amazon S3¶
StreamAlert supports data analysis and alerting for logs written to Amazon S3 buckets.
This is achieved via Amazon S3 Event Notifications from an event type of s3:ObjectCreated:*
.
Example AWS use-cases:
AWS Config logs
S3 Server Access logs
Example non-AWS use-cases:
Host logs (syslog, auditd, osquery, …)
Network logs (Palo Alto Networks, Cisco, …)
Web Application logs (Apache, nginx, …)
SaaS logs (Box, GSuite, OneLogin, …)
Amazon Kinesis Data Streams¶
StreamAlert also utilizes Amazon Kinesis Data Streams for real-time data ingestion and analysis. By default, StreamAlert creates an Amazon Kinesis Data Stream per cluster.
Amazon SNS¶
Amazon Simple Notification Service (SNS) is a flexible, fully managed pub/sub messaging notification service for coordinating the delivery of messages to subscribing endpoints and clients.
StreamAlert can utilize SNS as an input for processing.
Use-cases:
Receiving messages from other AWS services
Global Settings¶
Settings that apply globally for StreamAlert are stored in the conf/global.json
file. This
file has a few different sections to help organize the settings by purpose. These sections are
described in further detail below.
Account¶
The account
section of conf/global.json
file is used to store information specifically
related to the AWS account used for your StreamAlert deployment.
Configuration¶
{
"account": {
"aws_account_id": "123456789012",
"prefix": "<prefix>",
"region": "us-east-1"
}
}
Options¶
Key |
Required |
Default |
Description |
|
Yes |
|
12 digit account ID for your AWS account |
|
Yes |
|
An alphanumeric and unique prefix to be used for your deployment |
|
Yes |
|
AWS region within which you would like to deploy StreamAlert |
Tip
The aws_account_id
and prefix
settings can be set using the CLI:
python manage.py configure aws_account_id 111111111111 # Replace with your 12-digit AWS account ID
python manage.py configure prefix <value> # Choose a unique name prefix (alphanumeric characters only)
However, if a different region is desired, it must be changed manually.
General¶
The general
section of conf/global.json
file is used to store general information related
to your StreamAlert deployment. Notably, paths to rules
and matchers
can be supplied here.
Configuration¶
{
"general": {
"terraform_files": [
"/absolute/path/to/extra/terraform/file.tf"
],
"matcher_locations": [
"matchers"
],
"rule_locations": [
"rules"
],
"scheduled_query_locations": [
"scheduled_queries"
],
"publisher_locations": [
"publishers"
],
"third_party_libraries": [
"pathlib2==2.3.5"
]
}
}
Options¶
Key |
Required |
Default |
Description |
|
Yes |
|
List of local paths where |
|
Yes |
|
List of local paths where |
|
Yes |
|
List of local paths where |
|
Yes |
|
List of local paths where |
|
No |
|
List of third party dependencies that should be installed via |
|
No |
|
List of local paths to Terraform files that should be included as part of this StreamAlert deployment |
Infrastructure¶
The infrastructure
section of conf/global.json
file is used to store information related
to settings for various global resources/infrastructure components needed by StreamAlert. There are
various subsections within this section, each of which is outlined below.
Alerts Firehose¶
By default, StreamAlert will send all alert payloads to S3 for historical retention and searching. These payloads include the original record data that triggered the alert, as well as the rule that was triggered, the source of the log, the date/time the alert was triggered, the cluster from which the log came, and a variety of other fields.
Configuration¶
The following alerts_firehose
configuration settings can be defined within the infrastructure
section of global.json
:
{
"infrastructure": {
"alerts_firehose": {
"bucket_name": "<prefix>-streamalerts",
"buffer_size": 64,
"buffer_interval": 300,
"cloudwatch_log_retention": 14
}
}
}
Key |
Required |
Default |
Description |
|
No |
|
Bucket name to override the default name |
|
No |
|
Buffer incoming data to the specified size, in megabytes, before delivering it to S3 |
|
No |
|
Buffer incoming data for the specified period of time, in seconds, before delivering it to S3 |
|
No |
|
Days for which to retain error logs that are sent to CloudWatch in relation to this Kinesis Firehose Delivery Stream |
Alerts Table¶
StreamAlert utilizes a DynamoDB Table as a temporary storage mechanism when alerts are triggered from the Rules Engine. This table can be configured as necessary to scale to the throughput of your alerts.
Configuration¶
The following alerts_table
configuration settings can be defined within the infrastructure
section of global.json
:
{
"infrastructure": {
"alerts_table": {
"read_capacity": 10,
"write_capacity": 10
}
}
}
Key |
Required |
Default |
Description |
|
No |
|
Read capacity value to apply to the alerts DynamoDB Table |
|
No |
|
Write capacity value to apply to the alerts DynamoDB Table |
Classifier SQS¶
StreamAlert sends all classified logs to an SQS Queue. This queue is then read from by the Rules Engine function to perform rule analysis.
Configuration¶
Note
These configuration options are only available for legacy purposes and may be removed in a future release. They will typically only be needed if manually migrating from v2 to v3+.
The following classifier_sqs
configuration settings can be defined within the infrastructure
section of global.json
:
{
"infrastructure": {
"classifier_sqs": {
"use_prefix": true
}
}
}
Key |
Required |
Default |
Description |
|
No |
|
Whether the prefix should be prepended to the classified
logs SQS Queue that is created (set to |
Firehose (Historical Data Retention)¶
StreamAlert also supports sending all logs to S3 for historical retention and searching based on classified type of the log. Kinesis Data Firehose Delivery Streams are used to send the data to S3.
Configuration¶
The following firehose
configuration settings can be defined within the infrastructure
section of global.json
:
{
"infrastructure": {
"firehose": {
"enabled": true,
"bucket_name": "<prefix>-streamalert-data",
"buffer_size": 64,
"buffer_interval": 300,
"enabled_logs": {
"osquery": {
"enable_alarm": true
},
"cloudwatch:cloudtrail": {},
"ghe": {
"enable_alarm": true,
"evaluation_periods": 10,
"period_seconds": 3600,
"log_min_count_threshold": 100000
}
}
}
}
}
Key |
Required |
Default |
Description |
|
Yes |
|
If set to |
|
No |
|
Whether the prefix should be prepended to Firehoses that are created (only to be used for legacy purposes) |
|
No |
|
Bucket name to override the default name |
|
No |
|
Buffer incoming data to the specified size, in megabytes, before delivering it to S3 |
|
No |
|
Buffer incoming data for the specified period of time, in seconds, before delivering it to S3 |
|
No |
|
Which classified log types to send to Kinesis Firehose from the Classifier function, along with specific settings per log type |
Note
The enabled_logs
object should contain log types for which Firehoses should be created.
The keys in the ‘dictionary’ should reference the log type (or subtype) for which Firehoses
should be created, and the value should be additional (optional) settings per log type. The
following section contains more detail on these settings.
enabled_logs
¶The enabled_logs
section of the firehose
settings must explicitly specify the log types for
which you would like to enable historical retention. There are two syntaxes you may use to specify
log types:
parent log type:
osquery
log subtype:
osquery:differential
The former will create Firehose resources for all osquery
subtypes, while the latter
will only create one Firehose for specifically the osquery:differential
subtype.
Since each Firehose that gets created can have additional settings applied to it, the proper way to
simply enable given log types is to add items to enabled_logs
as follows (note the empty
JSON object as the value):
{
"infrastructure": {
"firehose": {
"enabled_logs": {
"osquery": {},
"cloudwatch:cloudtrail": {}
}
}
}
}
Each Firehose that is created can be configured with an alarm that will fire when the incoming
log volume drops below a specified threshold. This is disabled by default, and can be enabled
by setting enable_alarm
to true
within the configuration for the log type.
Key |
Required |
Default |
Description |
|
No |
|
If set to |
|
No |
|
Consecutive periods the records count threshold must be breached before triggering an alarm |
|
No |
|
Period over which to count the IncomingRecords (default: 86400 seconds [1 day]) |
|
No |
|
Alarm if IncomingRecords count drops below this value in the specified period(s) |
|
No |
|
Optional CloudWatch alarm action or list of CloudWatch alarm actions (e.g. SNS topic ARNs) |
Note
See the ghe
log type in the example firehose
configuration above for how this can be performed.
When adding a log type to the enable_logs
configuration, a dedicated Firehose is created for
each of the log subtypes.
For instance, suppose the following schemas are defined across one or more files in the conf/schemas
directory:
{
"cloudwatch:events": {
"parser": "json",
"schema": {"key": "type"}
},
"cloudwatch:cloudtrail": {
"parser": "json",
"schema": {"key": "type"}
},
"osquery:differential": {
"parser": "json",
"schema": {"key": "type"}
},
"osquery:status": {
"parser": "json",
"schema": {"key": "type"}
}
}
Supposing also that the above enabled_logs
example is used, the
following Firehose resources will be created:
<prefix>_streamalert_cloudwatch_cloudtrail
<prefix>_streamalert_osquery_differential
<prefix>_streamalert_osquery_status
Note
Notice that there is no Firehose created for the cloudwatch:events
log type. This is because
this log type was not included in the enabled_logs
configuration, and only the
cloudwatch:cloudtrail
subtype of cloudwatch
was included.
Each Delivery Stream delivers data to the same S3 bucket created by the module in a prefix based on the corresponding log type:
arn:aws:s3:::<prefix>-streamalert-data/cloudwatch_cloudtrail/YYYY/MM/DD/data_here
arn:aws:s3:::<prefix>-streamalert-data/osquery_differential/YYYY/MM/DD/data_here
arn:aws:s3:::<prefix>-streamalert-data/osquery_status/YYYY/MM/DD/data_here
Depending on your log volume, you may need to request limit increases for Firehose. * Kinesis Firehose Limits * Kinesis Firehose Delivery Settings
Monitoring¶
StreamAlert can send notifications of issues with infrastructure to an SNS topic (aka “monitoring” the health of your infrastructure).
Configuration¶
The following monitoring
configuration settings can be defined within the infrastructure
section of global.json
:
{
"infrastructure": {
"monitoring": {
"sns_topic_name": "name-of-existing-sns-topic-to-use"
}
}
}
Key |
Required |
Default |
Description |
|
No |
|
Name of an existing SNS Topic to which monitoring information should be sent instead of the default one that will be created |
Rule Staging¶
StreamAlert comes with the ability to stage rules that have not been battle tested. This feature is backed by a DynamoDB table, for which there are a few configurable options.
Configuration¶
{
"infrastructure": {
"rule_staging": {
"cache_refresh_minutes": 10,
"enabled": true,
"table_read_capacity": 5,
"table_write_capacity": 5
}
}
}
Key |
Required |
Default |
Description |
|
No |
|
Should be set to |
|
No |
|
Maximum amount of time (in minutes) the Rules Engine function should wait to force refresh the rule staging information. |
|
No |
|
DynamoDB read capacity to allocate to the table that stores staging information. The default setting should be sufficient in most use cases. |
|
No |
|
DynamoDB write capacity to allocate to the table that stores staging information. The default setting should be sufficient in most use cases. |
Tip
By default, the rule staging feature is not enabled. It can be enabled with the following command:
python manage.py rule-staging enable --true
S3 Access Logging¶
StreamAlert will send S3 Server Access logs generated by all the buckets in your deployment to a logging bucket that will be created by default. However, if you have an existing bucket where you are already centralizing these logs, the name may be provided for use by StreamAlert’s buckets.
Configuration¶
The following s3_access_logging
configuration settings can be defined within the
infrastructure
section of global.json
:
{
"infrastructure": {
"s3_access_logging": {
"bucket_name": "name-of-existing-bucket-to-use"
}
}
}
Key |
Required |
Default |
Description |
|
No |
|
Name of existing S3 bucket to use for logging instead of the default bucket that will be created |
Terraform¶
StreamAlert uses Terraform for maintaining its infrastructure as code and Terraform will utilize a
remote state that is stored on S3. By default, we will create a bucket for use by Terraform, but
a bucket name can also be supplied to use instead. The terraform
section of conf/global.json
file should be used to store these settings.
Configuration¶
{
"terraform": {
"bucket_name": "<prefix>-streamalert-terraform-state",
"state_key_name": "streamalert_state/terraform.tfstate"
}
}
Options¶
Key |
Required |
Default |
Description |
|
No |
|
Name of existing S3 bucket to use for the Terraform remote state instead of the default bucket that will be created |
|
No |
|
Name to use as the key of the Terraform state object in S3 |
Cluster Settings¶
Inbound data is directed to one of StreamAlert’s clusters, each with its own data sources and classifier function. For many applications, one cluster may be enough. However, adding additional clusters can potentially improve performance. For example, you could have:
A cluster dedicated to StreamAlert apps
A separate cluster for each of your inbound Kinesis Data Streams
A separate cluster for data from each environment (prod, staging, corp, etc)
Note
Alerting and historical search components are global and not tied to a specific cluster, although alerts will indicate their originating cluster.
Each cluster is defined by its own JSON file in the conf/clusters directory. To add a new cluster, simply create a new JSON file with the cluster name and fill in your desired configuration, described below.
Changes to cluster configuration can be applied with one of the following:
python manage.py build # Apply all changes
python manage.py build --target cloudwatch_monitoring_* # Only apply changes to CloudWatch Monitoring module for all clusters
Required Settings¶
There are a few top-level settings that are required within each cluster configuration file.
Notably, these settings configure the data_sources
and classifier_config
.
Datasource Configuration¶
A cluster’s classifier function knows what types of data it should parse based on defined
data_sources
within each cluster configuration file in the conf/clusters
directory.
For background on supported data source types, read Datasources.
Note
As of release 3.0.0 data source configuration has moved from sources.json
into the data_sources
top level key for each your clusters.
Each data source (kinesis
, s3
, or sns
) contains a mapping of specific resource names
(Kinesis stream names, S3 bucket names, etc) along with a list of log types that should be expected
from that source.
Note
Log schemas are defined in one or more files in the conf/schemas
directory. See
the Schemas page for more information, or the
Example Schemas page for some sample log definitions.
Each log in the list of logs instructs StreamAlert’s classifier function to attempt to parse incoming data from the given resource as that log type. The classifier will only try to parse incoming data as types that are defined for each resource. If new types of data are fed into one of the sources defined here, but this configuration is not updated, it will result in failed parses in the classifier function.
Example¶
{
"data_sources": {
"kinesis": {
"abc_corporate_streamalert": [
"cloudwatch",
"cloudtrail"
],
"abc_production_stream_streamalert": [
"osquery"
]
},
"s3": {
"abc.ghe.logs": [
"ghe"
],
"abc.hids.logs": [
"carbonblack"
]
},
"sns": {
"abc_sns_topic": [
"fleet"
]
}
}
}
Important
Any data source log type that is listed must have an associated log definition within your schemas definitions.
Classifier Configuration¶
A cluster’s classifier function is a required component and configurable with a few settings that
may be tweaked to help with performance or cost. These exist as part of the classifier_config
block within each cluster configuration file in the conf/clusters
directory.
Example: Basic Cluster¶
{
"id": "minimal-cluster",
"classifier_config": {
"enable_custom_metrics": true,
"log_level": "info",
"log_retention_days": 14,
"memory": 128,
"timeout": 60
},
"data_sources": {
"kinesis": {
"abc_corporate_streamalert": [
"cloudwatch"
]
}
}
}
Example: Classifier with SNS Inputs¶
{
"id": "sns-inputs",
"classifier_config": {
"enable_custom_metrics": true,
"inputs": {
"aws-sns": [
"arn:aws:sns:REGION:ACCOUNT:TOPIC_NAME"
]
},
"log_level": "info",
"log_retention_days": 14,
"memory": 128,
"timeout": 60
},
"data_sources": {
"sns": {
"TOPIC_NAME": [
"cloudwatch"
]
}
}
}
Options¶
Key |
Default |
Description |
|
|
Enable custom metrics for the cluster |
|
|
Toggle threat intel integration (beta) |
|
|
SNS topics which can invoke the classifier function (see example) |
|
|
Lambda CloudWatch logging level |
|
— |
Lambda function memory (MB) |
|
— |
Lambda function timeout (seconds) |
Modules¶
Optional configuration settings are divided into different modules, which should be defined within
the modules
section of each cluster configuration file in the conf/clusters
directory.
CloudTrail¶
StreamAlert has native support for enabling and monitoring AWS CloudTrail.
This module is implemented by terraform/modules/tf_cloudtrail.
Example: CloudTrail via S3 Events¶
{
"id": "cloudtrail-s3-events",
"classifier_config": {
"enable_custom_metrics": true,
"log_level": "info",
"log_retention_days": 14,
"memory": 128,
"timeout": 60
},
"data_sources": {
"s3": {
"abc-prod-streamalert-cloudtrail": [
"cloudtrail"
]
}
},
"modules": {
"cloudtrail": {
"s3_settings": {
"enable_events": true
}
}
}
}
This creates a new CloudTrail and an S3 bucket for the resulting logs. Each new object in the bucket
will invoke the StreamAlert classifier function via S3 events. For this data, rules should be written
against the cloudtrail:events
log type.
Example: CloudTrail via CloudWatch Logs¶
{
"id": "cloudtrail-via-cloudwatch",
"classifier_config": {
"enable_custom_metrics": true,
"log_level": "info",
"log_retention_days": 14,
"memory": 128,
"timeout": 60
},
"data_sources": {
"kinesis": {
"abc_prod_streamalert": [
"cloudwatch"
]
}
},
"modules": {
"cloudtrail": {
"s3_settings": {
"enable_events": true
},
"send_to_cloudwatch": true
},
"kinesis": {
"streams": {
"retention": 96,
"shards": 10
}
},
"kinesis_events": {
"batch_size": 10,
"enabled": true
}
}
}
This also creates the CloudTrail and S3 bucket, but now the CloudTrail logs are also delivered to
CloudWatch Logs Group that forwards them to a Kinesis stream via a CloudWatch Logs Subscription Filter.
This can scale to higher throughput, since StreamAlert does not have to download potentially very
large files from S3. In this case, rules should be written against the cloudwatch:cloudtrail
log type.
Options¶
Key |
Default |
Description |
|
|
Toggle the |
|
|
Toggle to |
|
|
Ignore events from the StreamAlert deployment region. This only has an effect if |
|
|
If |
|
|
Enable CloudTrail delivery to CloudWatch Logs. Logs sent to CloudWatch Logs are forwarded to this cluster’s Kinesis stream for processing. If this is enabled, the |
|
(Computed from CloudWatch Logs Destination module) |
CloudWatch Destination ARN used for forwarding data to this cluster’s Kinesis stream. This has a default value but can be overridden here with a different CloudWatch Logs Destination ARN |
|
|
Create an SNS topic to which notifications should be sent when CloudTrail puts a new object in the S3 bucket. The topic name will be the same as the S3 bucket name |
|
|
Allow account IDs specified in the |
|
|
Configuration options for CloudTrail related to S3. See the S3 Options section below for details. |
S3 Options¶
The cloudtrail
module has a subsection of s3_settings
, which contains options related to S3.
Key |
Default |
Description |
|
|
Grant write access to the CloudTrail S3 bucket for these account IDs. The primary, aka deployment account ID, will be added to this list. |
|
|
Enable S3 events for the logs sent to the S3 bucket. These will invoke this cluster’s classifier for every new object in the CloudTrail S3 bucket |
|
|
If |
|
|
Name of the S3 bucket to be used for the CloudTrail logs. This can be overridden, but defaults to |
|
|
An S3 event selector to enable object level logging for the account’s S3 buckets. Choices are: “ReadOnly”, “WriteOnly”, “All”, or “”, where “” disables object level logging for S3 |
CloudWatch Events¶
StreamAlert supports ingestion of events published to CloudWatch Events for processing.
This module is implemented by terraform/modules/tf_cloudwatch_events.
Note
The Kinesis module must also be enabled.
Example¶
{
"id": "cloudwatch-events-example",
"classifier_config": {
"enable_custom_metrics": true,
"log_level": "info",
"log_retention_days": 14,
"memory": 128,
"timeout": 60
},
"data_sources": {
"kinesis": {
"abc_prod_streamalert": [
"cloudwatch"
]
}
},
"modules": {
"cloudwatch_events": {
"event_pattern": {
"account": [
"123456789012"
],
"detail-type": [
"EC2 Instance Launch Successful",
"EC2 Instance Launch Unsuccessful",
"EC2 Instance Terminate Successful",
"EC2 Instance Terminate Unsuccessful"
]
},
"cross_account": {
"accounts": {
"123456789012": [
"us-east-1"
]
},
"organizations": {
"o-aabbccddee": [
"us-east-1"
]
}
}
},
"kinesis": {
"streams": {
"retention": 96,
"shards": 10
}
},
"kinesis_events": {
"batch_size": 100,
"enabled": true
}
}
}
This creates a CloudWatch Events Rule that will publish all events that match the provided
event_pattern
to the Kinesis Stream for this cluster. Note in the example above that a custom
event_pattern
is supplied, but may be omitted entirely. To override the default event_patten
(shown below), a value of None
or {}
may also be supplied to capture all events,
regardless of which account the logs came from. In this case, rules should be written against
the cloudwatch:events
log type.
Options¶
Key |
Default |
Description |
|
|
The CloudWatch Events pattern to control what is sent to Kinesis |
|
|
Configuration options to enable cross account access for specific AWS Accounts and Organizations. See the Cross Account Options section below for details. |
Cross Account Options¶
The cross_account
section of the cloudwatch_events
module has two subsections, outlined here. Usage of these is also shown in the example above.
Key |
Default |
Description |
|
|
A mapping of account IDs and regions for which cross account access should be enabled. Example: |
|
|
A mapping of organization IDs and regions for which cross account access should be enabled. Example: |
CloudWatch Logs¶
StreamAlert makes it easy to ingest CloudWatch Logs from any AWS account. A common use case is to ingest and scan CloudTrail from multiple AWS accounts (delivered via CloudWatch Logs), but you could also ingest any application logs delivered to CloudWatch.
Note
The Kinesis module must also be enabled.
This module is implemented by terraform/modules/tf_cloudwatch_logs_destination.
Example¶
{
"id": "cloudwatch-logs-example",
"classifier_config": {
"enable_custom_metrics": true,
"log_level": "info",
"log_retention_days": 14,
"memory": 128,
"timeout": 60
},
"data_sources": {
"kinesis": {
"abc_prod_streamalert": [
"cloudwatch"
]
}
},
"modules": {
"cloudwatch_logs_destination": {
"cross_account_ids": [
"111111111111"
],
"enabled": true,
"regions": [
"ap-northeast-1",
"ap-northeast-2",
"ap-southeast-2"
]
},
"kinesis": {
"streams": {
"retention": 96,
"shards": 10
}
},
"kinesis_events": {
"batch_size": 100,
"enabled": true
}
}
}
This creates an IAM role for CloudWatch subscriptions, authorized to gather logs from the StreamAlert account as well as account 111111111111, in all regions except Asia-Pacific.
Once you have applied this change to enable StreamAlert to subscribe to CloudWatch logs, you need to
create a subscription filter
in the producer account to actually deliver the logs, optionally with
Terraform.
The CloudWatch logs destination ARN will be
arn:aws:logs:REGION:STREAMALERT_ACCOUNT:destination:streamalert_CLUSTER_cloudwatch_to_kinesis
.
Options¶
Key |
Default |
Description |
|
|
Toggle the |
|
|
Authorize StreamAlert to gather logs from these accounts |
|
|
Do not create CloudWatch Log destinations in these regions |
CloudWatch Monitoring¶
To ensure data collection is running smoothly, we recommend enabling CloudWatch metric alarms to monitor the health the classifier Lambda function(s) and, if applicable, the respective Kinesis stream.
This module is implemented by terraform/modules/tf_monitoring.
Example¶
{
"id": "cloudwatch-monitoring-example",
"classifier_config": {
"enable_custom_metrics": true,
"log_level": "info",
"log_retention_days": 14,
"memory": 128,
"timeout": 60
},
"data_sources": {
"kinesis": {
"abc_prod_streamalert": [
"cloudwatch"
]
}
},
"modules": {
"cloudwatch_monitoring": {
"enabled": true,
"kinesis_alarms_enabled": true,
"lambda_alarms_enabled": true,
"settings": {
"lambda_invocation_error_threshold": 0,
"lambda_throttle_error_threshold": 0,
"kinesis_iterator_age_error_threshold": 1000000,
"kinesis_write_throughput_exceeded_threshold": 10
}
}
}
}
This enables both the Kinesis and Lambda alarms and illustrates how the alarm thresholds can be tuned. A total of 5 alarms will be created:
Classifier Lambda function invocation errors
Classifier Lambda function throttles
Classifier Lambda function iterator age, applicable only for Kinesis invocations
Kinesis iterator age
Kinesis write exceeded
Options¶
Key |
Default |
Description |
|
|
Toggle the |
|
|
Toggle the Kinesis-specific metric alarms |
|
|
Toggle the Lambda-specific metric alarms |
|
|
Alarm-specific settings (see below) |
There are three settings for a CloudWatch alarm:
Period is the length of time to evaluate the metric
Evaluation Periods is the number of periods over which to evaluate the metric
Threshold is the upper or lower bound after which the alarm will trigger
The following options are available in the settings
dictionary:
Key |
Default |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Receiving CloudWatch Metric Alarms¶
By default, StreamAlert automatically creates a <prefix>_streamalert_monitoring
SNS topic that receives
CloudWatch metric alarm notifications. If you would instead like to use an existing SNS topic for
metric alarms, see the Monitoring configuration settings for how
to set this up.
In either case, to receive notifications for metric alarms, simply subscribe to the SNS topic.
Kinesis (Data Streams)¶
This module creates a Kinesis Data Stream in the cluster, which is the most common approach for StreamAlert data ingestion. In fact, the CloudTrail, CloudWatch Logs, and VPC Flow Logs cluster modules all rely on Kinesis streams for data delivery.
Each Kinesis stream is a set of shards, which in aggregate determine the total data capacity of the stream. Indeed, this is the primary motivation for StreamAlert’s cluster design - each cluster can have its own data stream whose shard counts can be configured individually.
This module is implemented by terraform/modules/tf_kinesis_streams.
Example¶
{
"id": "kinesis-example",
"classifier_config": {
"enable_custom_metrics": true,
"log_level": "info",
"log_retention_days": 14,
"memory": 128,
"timeout": 60
},
"data_sources": {
"kinesis": {
"abc_prod_streamalert": [
"cloudwatch"
]
}
},
"modules": {
"kinesis": {
"streams": {
"create_user": true,
"retention": 24,
"shard_level_metrics": [
"IncomingBytes",
"IncomingRecords",
"IteratorAgeMilliseconds",
"OutgoingBytes",
"OutgoingRecords",
"WriteProvisionedThroughputExceeded"
],
"shards": 1,
"terraform_outputs": [
"user_arn",
"access_key_id",
"secret_key"
]
}
},
"kinesis_events": {
"batch_size": 100,
"enabled": true
}
}
}
This creates a Kinesis stream and an associated IAM user and hooks up stream events to the
StreamAlert classifier function in this cluster. The terraform_outputs
section instructs
Terraform to print the IAM User’s ARN, along with the access key ID and secret key
for the newly created user.
Options¶
The kinesis
module expects a single key (streams
) whose value is a dictionary with the
following options:
Key |
Default |
Description |
|
|
Create an IAM user authorized to |
|
— |
Length of time (hours) data records remain in the stream |
|
|
Enable these enhanced shard-level metrics |
|
— |
Number of shards (determines stream data capacity) |
|
|
Authorize these account IDs to assume an IAM role which can write to the stream |
|
|
[optional] Custom name for the stream that will be created |
Kinesis Scaling¶
If the need arises to scale a Kinesis Stream, the process below is recommended.
First, update the Kinesis Stream shard count with the following command:
aws kinesis update-shard-count \
--stream-name <prefix>_<cluster>_streamalert_kinesis \
--target-shard-count <new_shard_count> \
--scaling-type UNIFORM_SCALING
AWS CLI reference for update-shard-count
Repeat this process for each cluster in your deployment.
Note
It may take several minutes, or longer, for new shards to be created.
Then, update each respective cluster configuration file with the updated shard count.
Finally, apply the Terraform changes to ensure a consistent state.
python manage.py build --target kinesis
Kinesis Events¶
The Kinesis Events module connects a Kinesis Stream to the classifier Lambda function.
Note
The Kinesis module must also be enabled.
This module is implemented by terraform/modules/tf_kinesis_events.
Options¶
Key |
Default |
Description |
|
|
Max records the classifier function can receive per invocation |
|
|
Toggle the kinesis events on and off |
VPC Flow Logs¶
VPC Flow Logs capture information about the IP traffic going to and from an AWS VPC.
When writing rules for this data, use the cloudwatch:flow_logs
log source.
Note
The Kinesis module must also be enabled.
This module is implemented by terraform/modules/tf_flow_logs.
Example¶
{
"id": "prod",
"classifier_config": {
"enable_custom_metrics": true,
"log_level": "info",
"log_retention_days": 14,
"memory": 128,
"timeout": 60
},
"data_sources": {
"kinesis": {
"abc_prod_streamalert": [
"cloudwatch:flow_logs"
]
}
},
"modules": {
"flow_logs": {
"enis": [],
"enabled": true,
"subnets": [
"subnet-12345678"
],
"vpcs": [
"vpc-ed123456"
]
},
"kinesis": {
"streams": {
"retention": 24,
"shards": 10
}
},
"kinesis_events": {
"batch_size": 2,
"enabled": true
}
}
}
This creates the <prefix>_prod_streamalert_flow_logs
CloudWatch Log Group, adds flow logs
to the specified subnet, eni, and vpc IDs with the log group as their target, and adds a CloudWatch
Logs Subscription Filter to that log group to send to Kinesis for consumption by StreamAlert.
Options¶
Key |
Default |
Description |
|
|
Toggle the |
|
|
Toggle flow log creation |
|
|
Day for which logs should be retained in the log group |
|
|
Add flow logs for these ENIs |
|
|
Add flow logs for these VPC subnet IDs |
|
|
Add flow logs for these VPC IDs |
Note
One of the following must be set for this module to have any result: enis
, subnets
, or vpcs
S3 Events¶
You can enable S3 event notifications on any of your S3 buckets to invoke the StreamAlert classifier function. When the StreamAlert classifier function receives this notification, it downloads the object from S3 and runs each record through the classification logic.
This module is implemented by terraform/modules/tf_s3_events.
Example¶
{
"id": "s3-events-example",
"classifier_config": {
"enable_custom_metrics": true,
"log_level": "info",
"log_retention_days": 14,
"memory": 128,
"timeout": 60
},
"data_sources": {
"s3": {
"bucket_name_01": [
"cloudtrail"
],
"bucket_name_02": [
"cloudtrail"
]
}
},
"modules": {
"s3_events": {
"bucket_name_01": [
{
"filter_prefix": "AWSLogs/1234",
"filter_suffix": ".log"
},
{
"filter_prefix": "AWSLogs/5678"
}
],
"bucket_name_02": []
}
}
}
This configures the two buckets (bucket_name_01
and bucket_name_02
) to notify the classifier
function in this cluster when new objects arrive in the bucket at the specified (optional) prefix(es),
provided the objects have the specified (optional) suffix(es). Additionally, this will authorize the
classifier to download objects from each bucket.
Options¶
The s3_events
module expects a dictionary/map of bucket names, where the value for each key
(bucket name) is a list of maps. Each map in the list can include optional prefixes (filter_prefix
)
and suffixes (filter_suffix
) to which the notification should be applied. The mere existence of a
bucket name in this map within this module implicitly enables event notifications for said bucket.
Note that the value specified for the map of prefixes and suffixes can be an empty list ([]
).
An empty list will enable event notifications for all objects created in the bucket by default.
See the above example for how prefixes/suffixes can be (optionally) specified (as in “bucket_name_01”) and how to use the empty list to enable bucket-wide notifications (as in “bucket_name_02”).
Schemas¶
Log schemas are required by StreamAlert to detect the correct log type of an incoming record.
Schemas are defined in conf/schemas/<log-type>.json
and used by rules to determine which records are analyzed.
They can be defined in one single file, or multiple files, ideally split by each log type, e.g carbonblack.json
They represent the structure of a given log in the form of key/value pairs.
Each key in a schema corresponds to the name of a field referenced by rules and its value represents the data type the field is cast into.
Note
Ordering is strict for the csv
parser.
Log Options¶
Key |
Required |
Description |
|
Yes |
The name of the parser to use for a given log’s data-type. Options include |
|
Yes |
A map of key/value pairs of the name of each field with its type |
|
No |
Configuration options specific to this log type (see table below for more information) |
Settings in configuration
¶
The below settings may optionally be defined within the configuration
block.
Key |
Description |
|
For use with key/value or csv logs to identify the delimiter character for the log |
|
Used with nested records to identify keys that are at a higher level than the nested records, but still hold some value and should be stored |
|
Path to nested records to be ‘extracted’ from within a JSON object |
|
The key name containing a JSON string to parse. This will become the final record |
|
Various patterns to enforce within a log given provided fields |
|
Keys that may or may not be present in a log being parsed |
|
Keys that may or may not be present in the envelope of a log being parsed |
|
Integer value used to set the order that schema get tested against data, with the range 0..N where 0 is the highest priority and N is the lowest |
|
For use with key/value logs to identify the separator character for the log |
Writing Schemas¶
Schema values are strongly typed and enforced.
Normal types:
string
-'example'
integer
-0
float
-0.0
boolean
-true/false
Special types:
{}
- zero or more key/value pairs of any type[]
- zero or more elements of any type
Basic Schema Definition¶
Example Schema¶
{
"example_log_name": {
"parser": "json",
"schema": {
"field_1": "string",
"field_2": "integer",
"field_3": "boolean",
"field_4": "float",
"field_5": [],
"field_6": {}
}
}
}
Example rule
¶
@rule(logs=['example_log_name'], # the log_name as defined above
outputs=['slack'])
def example_rule(rec):
"""Description of the rule"""
return (
rec['field_1'] == 'string-value' and # fields as defined in the schema above
rec['field_2'] < 5 and
'random-key-name' in rec['field_6']
)
Casting Normal Types¶
Example Schema¶
{
"example_log_name": {
"parser": "json",
"schema": {
"field_1": "string",
"field_2": "integer",
"field_3": "boolean"
}
}
}
Example Log Before Parse¶
{
"field_1": "test-string",
"field_2": "100",
"field_3": "true"
}
Example Log After Parsing¶
{
'field_1': 'test-string',
'field_2': 100,
'field_3': True
}
Example rule
¶
Notice the boolean comparison for the newly-cast types.
@rule(logs=['example_log_name'],
outputs=['example_output'])
def example_rule(rec):
return (
rec['field_2'] == 100 and
rec['field_3'] is not False
)
Casting Special Types¶
Schemas can be as rigid or permissive as you want (see Example: osquery).
Usage of the special types normally indicates a loose schema, in that not every part of the incoming data is described.
Example Schema¶
{
"example_log_name": {
"parser": "json",
"schema": {
"field_1": "string",
"field_2": "integer",
"field_3": {}
}
}
}
Example Log Before Parse¶
{
"field_1": "test-string",
"field_2": "100",
"field_3": {
"data": "misc-data",
"time": "1491584265"
}
}
Example Log After Parsing¶
{
'field_1': 'test-string',
'field_2': 100,
'field_3': {
'data': 'misc-data',
'time': '1491584265'
}
}
Example Rule with a Loose Schema¶
@rule(logs=['example_log_name'],
outputs=['example_output'],
req_subkeys={'field_3': ['time']})
def example_rule_2(rec):
return (
field_2 == 100 and
last_hour(int(rec['field_3']['time']))
)
Also note the usage of req_subkeys
above.
This keyword argument ensures that the parsed log contains the required subkeys of rec['field_3']['time']
.
Optional Top Level Keys¶
If incoming logs occasionally include/exclude certain fields, this can be expressed in the configuration
settings as optional_top_level_keys
.
The value of optional_top_level_keys
should be an array, with entries corresponding to the actual key in the schema that is optional. Any keys specified in this array should also be included in the defined schema.
If any of the optional_top_level_keys
do not exist in the log being parsed, defaults are appended to the parsed log depending on the declared value.
Example Schema¶
{
"test_log_type_json": {
"parser": "json",
"schema": {
"key1": [],
"key2": "string",
"key3": "integer",
"key4": "boolean",
"key5": "string"
},
"configuration": {
"optional_top_level_keys": [
"key4",
"key5"
]
}
}
}
Example Log Before Parse¶
{
"key1": [
1,
2,
3
],
"key2": "test",
"key3": 100,
"key4": true
}
Example Log After Parsing¶
{
'key1': [3, 4, 5],
'key2': 'test',
'key3': 200,
'key4': True, # default is overridden by parsed log
'key5': '' # default value for string is inserted
}
JSON Parsing¶
Options¶
{
"log_name": {
"parser": "json",
"schema": {
"field": "type",
"field...": "type..."
},
"configuration": {
"json_path": "jsonpath expression",
"json_regex_key": "key with nested JSON string to extract",
"envelope_keys": {
"field": "type",
"field...": "type..."
}
}
}
}
Note
Options related to nested JSON are defined within configuration
. The json_path
key
should hold the JSON path to the records, while envelope_keys
is utilized to capture keys
in the root of our nested structure.
Nested JSON¶
Normally, a log contains all fields to be parsed at the top level:
{
"example": 1,
"host": "myhostname.domain.com",
"time": "10:00 AM"
}
In some cases, the fields to be parsed and analyzed may be nested several layers into the data:
{
"logs": {
"results": [
{
"example": 1,
"host": "jumphost-1.domain.com",
"time": "11:00 PM"
},
{
"example": 2,
"host": "jumphost-2.domain.com",
"time": "12:00 AM"
}
]
},
"id": 1431948983198,
"application": "my-app"
}
To extract these nested records, use the configuration
option json_path
:
{
"log_name": {
"parser": "json",
"schema": {
"example": "integer",
"host": "string",
"time": "string"
},
"configuration": {
"json_path": "logs.results[*]"
}
}
}
Log Patterns¶
Log patterns provide the ability to differentiate log schemas that are nearly identical.
They can be added by using the configuration
option log_patterns
.
Log patterns are a collection of key/value pairs where the key is the name of the field, and the value is a list of expressions the log parser will search for in said field of the log.
If any of the log patterns listed exists in a specific field, the parser will consider the data valid.
This feature is helpful to reduce false positives, as it provides to ability to match a schema only if specific values are present in a log.
Wild card log patterns are supported using the *
or ?
symbols, as shown below.
Example schema:
{
"log_name": {
"schema": {
"computer_name": "string",
"hostname": "string",
"instance_id": "string",
"process_id": "string",
"message": "string",
"timestamp": "float",
"type": "string"
},
"parser": "json",
"configuration": {
"log_patterns": {
"type": [
"*bad.log.type*"
]
}
}
}
}
Example logs:
{
"computer_name": "test-server-name",
"hostname": "okay_host",
"instance_id": "95909",
"process_id": "82571",
"message": "this is not important info",
"timestamp": "1427381694.88",
"type": "good.log.type.value"
}
Note
The above schema will not match the configuration above.
{
"computer_name": "fake-server-name",
"hostname": "bad_host",
"instance_id": "589891",
"process_id": "72491",
"message": "this is super important info",
"timestamp": "1486943917.12",
"type": "bad.log.type.value"
}
Note
The above schema will match the configuration above.
Envelope Keys¶
Continuing with the example above, if the id
and application
keys in the root of the log are needed for analysis, they can be added by using the configuration
option envelope_keys
:
{
"log_name": {
"parser": "json",
"schema": {
"example": "integer",
"host": "string",
"time": "string"
},
"configuration": {
"json_path": "logs.results[*]",
"envelope_keys": {
"id": "integer",
"application": "string"
}
}
}
}
The resultant parsed records:
[
{
"example": 1,
"host": "jumphost-1.domain.com",
"time": "11:00 PM",
"streamalert:envelope_keys": {
"id": 1431948983198,
"application": "my-app"
}
},
{
"example": 2,
"host": "jumphost-2.domain.com",
"time": "12:00 AM",
"streamalert:envelope_keys": {
"id": 1431948983198,
"application": "my-app"
}
}
]
Nested JSON Regex Parsing¶
When using forwarders such as fluentd
, logstash
, or rsyslog
, log data may be wrapped with additional context keys:
{
"collector": "my-app-1",
"date-collected": "Oct 12, 2017",
"@timestamp": "1507845487",
"data": "<0> program[pid]: {'actual': 'data is here'}"
}
To parse the nested JSON string as the record, use the following schema options:
{
"json:regex_key_with_envelope": {
"schema": {
"actual": "string"
},
"parser": "json",
"configuration": {
"envelope_keys": {
"collector": "string",
"date-collected": "string",
"@timestamp": "string"
},
"json_regex_key": "data"
}
}
}
Optionally, you can omit envelope keys if they provide no value in rules.
CSV Parsing¶
Options¶
{
"csv_log_name": {
"parser": "csv",
"schema": {
"field": "type",
"field...": "type..."
},
"configuration": {
"delimiter": ","
}
}
}
Note
A custom delimiter is specified within configuration
above.
By default, the csv
parser will use ,
as the delimiter.
The configuration
setting is optional.
Ordering of the fields within schema
is strict.
Nested CSV¶
Some CSV logs have nested fields.
Example logs:
"1485729127","john_adams","memcache,us-east1"
"1485729127","john_adams","mysqldb,us-west1"
You can support this with a schema similar to the following:
{
"example_csv_with_nesting": {
"parser": "csv",
"schema": {
"time": "integer",
"user": "string",
"message": {
"role": "string",
"region": "string"
}
}
}
}
KV Parsing¶
Options¶
{
"kv_log_name": {
"parser": "kv",
"schema": {
"field": "type",
"field...": "type..."
},
"configuration": {
"delimiter": " ",
"separator": "="
}
}
}
Note
The delimiter
and separator
keys within configuration
indicate the values to use for delimiter and field separator, respectively.
By default, the kv
parser will use a single space as the delimiter and =
as the field separator.
The configuration
setting is optional.
Example schema:
{
"example_kv_log_type": {
"parser": "kv",
"schema": {
"time": "integer",
"user": "string",
"result": "string"
}
}
}
Example log:
"time=1039395819 user=bob result=pass"
Syslog Parsing¶
Options¶
{
"syslog_log_name": {
"parser": "syslog",
"schema": {
"timestamp": "string",
"host": "string",
"application": "string",
"message": "string"
}
}
}
The syslog
parser has no configuration
options.
The schema is also static for this parser because of the regex used to parse records.
Log Format¶
The syslog
parser matches events with the following format:
timestamp(Month DD HH:MM:SS) host application: message
Example logs:
Jan 10 19:35:33 vagrant-ubuntu-trusty-64 sudo: session opened for root
Jan 10 19:35:13 vagrant-ubuntu-precise-32 ssh[13941]: login for user
More Examples¶
For a list of schema examples, see Example Schemas
Deployment¶
Make sure you’ve completed the Getting Started instructions prior to continuing.
Initial Build¶
To initialize StreamAlert:
python manage.py init
This will perform the following:
Create S3 buckets and encryption keys.
Create AWS Lambda functions.
Build declared infrastructure in the Terraform files.
Deploy initial production AWS Lambda versions.
Type yes
at each prompt.
Continuous Deployment¶
As new rules, sources, or outputs are added to StreamAlert, new versions of the AWS Lambda functions must be deployed for changes to become effective.
To accomplish this, manage.py
contains a deploy
command.
To deploy new changes for all AWS Lambda functions:
python manage.py deploy
Optionally, to deploy changes for only a specific AWS Lambda function:
python manage.py deploy --functions alert
python manage.py deploy --functions alert_merger
python manage.py deploy --functions apps
python manage.py deploy --functions athena
python manage.py deploy --functions classifier
python manage.py deploy --functions rule
python manage.py deploy --functions rule_promo
python manage.py deploy --functions threat_intel_downloader
To apply infrastructure level changes (additional Kinesis Shards, new CloudTrails, etc), run:
python manage.py build
To apply specific changes to speed up terraform run, use the list-targets
command and the build
command with the --target
option:
python manage.py list-targets
Target Type
----------------------------------------------------------------------------------------------
...
classifier_prod_iam module
classifier_prod_lambda module
cloudwatch_monitoring_prod module
kinesis_events_prod module
kinesis_prod module
metric_filters_Classifier_FailedParses_PROD module
metric_filters_Classifier_FirehoseFailedRecords_PROD module
metric_filters_Classifier_FirehoseRecordsSent_PROD module
...
python manage.py build --target cloudwatch_monitoring_prod # apply to single module
python manage.py build --target kinesis_prod classifier_prod_iam # apply to two modules
python manage.py build --target metric_filters_Classifier_*_PROD # apply to three modules
Monitoring Functions¶
StreamAlert clusters contain a module to create CloudWatch Alarms for monitoring AWS Lambda invocation errors.
These ensure that the currently running code is reliable. To access these monitors, login to AWS Console and go to CloudWatch, and then click Alarms.
Rollback¶
StreamAlert Lambda functions are invoked via a production
alias that can be easily rolled back
to point to the previous version:
python manage.py rollback --functions rule
python manage.py rollback --functions alert
python manage.py rollback
This is helpful to quickly revert changes to Lambda functions, e.g. if a bad rule was deployed.
Rules¶
Rules contain data analysis and alerting logic
Rules are written in native Python, not a proprietary language
A Rule can utilize any Python function or library
A Rule can be run against multiple log sources if desired
Rules can be isolated into defined environments/clusters
Rule alerts can be sent to one or more outputs, like S3, PagerDuty or Slack
Rules can be unit and integration tested
Getting Started¶
All rules are located in the rules/
directory.
Within this directory are two folders: community/
and default/
.
community/
rules are publicly shared from StreamAlert contributors.
default/
can be used as a generic container for rules files.
You may create any folder structure desired, as all rules folders are imported recursively. Here are some examples:
rules/intrusion-detection/malware.py
rules/compliance/pci.py
rules/default/infrastructure.py
Note
If you create additional folders within the rules
directory, be sure to include a blank __init__.py
file.
Overview¶
A StreamAlert rule is a Python method that takes a parsed log record (dictionary) and returns True or False.
A return value of True
means an alert will be generated.
Example: The Basics¶
The simplest possible rule looks like this:
from streamalert.shared.rule import rule
@rule(logs=['cloudwatch:events'])
def all_cloudwatch_events(record):
"""Minimal StreamAlert rule: alert on all CloudWatch events"""
return True
This rule will be evaluated against all inbound logs that match the cloudwatch:events
schema defined in a schema file in the conf/schemas
directory, i.e conf/schemas/cloudwatch.json
.
In this case, all CloudWatch events will generate an alert, which will be sent to the alerts Athena table.
Example: Logic & Outputs¶
Let’s modify the rule to page the security team if anyone ever uses AWS root credentials:
from streamalert.shared.rule import rule
@rule(logs=['cloudwatch:events'], outputs=['pagerduty:csirt', 'slack:security'])
def cloudtrail_root_account_usage(record):
"""Page security team for any usage of AWS root account"""
return (record['detail']['userIdentity']['type'] == 'Root'
and record['detail']['userIdentity'].get('invokedBy') is None
and record['detail']['eventType'] != 'AwsServiceEvent')
Now, any AWS root account usage is reported to PagerDuty, Slack, and the aforementioned Athena table. In order for this to work, your datasources and outputs must be configured so that:
CloudTrail logs are being sent to StreamAlert via CloudWatch events
The
pagerduty:csirt
andslack:security
outputs have the proper credentials
Example: More Rule Options¶
The previous example suffers from the following problems:
It only works for CloudTrail data sent via CloudWatch
A single legitimate root login may generate hundreds of alerts in the span of a few minutes
There is no distinction between different AWS account IDs
We can generalize the rule to alleviate these issues:
from rules.helpers.base import get_first_key # Find first key recursively in record
from streamalert.shared.rule import rule
# This could alternatively be defined in matchers/matchers.py to be shareable
_PROD_ACCOUNTS = {'111111111111', '222222222222'}
def prod_account(record):
"""Match logs for one of the production AWS accounts"""
return (
record.get('account') in _PROD_ACCOUNTS or
get_first_key(record, 'userIdentity', {}).get('accountId') in _PROD_ACCOUNTS
)
@rule(
logs=['cloudtrail:events', 'cloudwatch:events'], # Rule applies to these 2 schemas
matchers=[prod_account], # Must be satisfied before rule is evaluated
merge_by_keys=['useragent'], # Merge alerts with the same 'useragent' key-value pair
merge_window_mins=5, # Merge alerts every 5 minutes
outputs=['pagerduty:csirt', 'slack:security'] # Send alerts to these 2 outputs
)
def cloudtrail_root_account_usage(record):
"""Page security team for any usage of AWS root account"""
return (
get_first_key(record, 'userIdentity', {}).get('type') == 'Root' and
not get_first_key(record, 'invokedBy') and
get_first_key(record, 'eventType') != 'AwsServiceEvent'
)
To simplify rule logic, you can extract common routines into custom helper methods.
These helpers are defined in rules/helpers/base.py
and can be called from within a matcher or rule (as shown here).
Since rules are written in Python, you can make them as sophisticated as you want!
Rule Options¶
The following table provides an overview of each rule option, with more details below:
@rule kwarg |
Type |
Description |
|
|
Dynamically configurable context passed to the alert processor |
|
|
List of normalized type names the rule applies to |
|
|
List of log schemas the rule applies to |
|
|
Matcher pre-conditions which must be met before rule logic runs |
|
|
List of key names that must match in value before merging alerts |
|
|
Merge related alerts at this interval rather than sending immediately |
|
|
List of alert outputs |
|
|
List of functions which return valid outputs |
|
|
Subkeys which must be present in the record |
- context
context
can pass extra instructions to the alert processor for more precise routing:# Context provided to the pagerduty-incident output with # instructions to assign the incident to a user. @rule(logs=['osquery:differential'], outputs=['pagerduty:csirt'], context={'pagerduty-incident': {'assigned_user': 'valid_user'}}) def my_rule(record, context): context['pagerduty-incident']['assigned_user'] = record['username'] return True
- datatypes
conf/normalized_types.json
defines data normalization, whereby you can write rules against a common type instead of a specific field or schema:"""These rules apply to several different log types, defined in conf/normalized_types.json""" from streamalert.shared.rule import rule from streamalert.shared.normalize import Normalizer @rule(datatypes=['sourceAddress'], outputs=['aws-sns:my-topic']) def ip_watchlist_hit(record): """Source IP address matches watchlist.""" return '127.0.0.1' in Normalizer.get_values_for_normalized_type(record, 'sourceAddress') @rule(datatypes=['command'], outputs=['aws-sns:my-topic']) def command_etc_shadow(record): """Command line arguments include /etc/shadow""" return any( '/etc/shadow' in cmd.lower() for cmd in Normalizer.get_values_for_normalized_type(record, 'command') )
- logs
logs
define the log schema(s) supported by the rule.Log datasources are defined within the
data_sources
field of a cluster such asconf/clusters/<cluster>.json
and their schemas are defined in one or more files in theconf/schemas
directory.Note
Either
logs
ordatatypes
must be specified for each rule- matchers
matchers
define conditions that must be satisfied in order for the rule to be evaluated. Default matchers are defined inmatchers/matchers.py
but can also be defined in the rules file (see example above).A matcher function should accept a single argument, just like rules. That argument will be the record that is being evaluated by the rule.
Rules can utilize matchers to reduce redundancy of code, allowing you to define the logic once and easily use it across multiple rules.
- merge_by_keys / merge_window_mins
Note
Specify neither or both of these fields, not one of them in isolation
For a better alert triage experience, you can merge alerts whose records share one or more fields in common:
@rule(logs=['your-schema'], merge_by_keys=['alpha', 'beta', 'gamma'], merge_window_mins=5): def merged_rule(record): return True
The alert merger Lambda function will buffer all of these alerts until 5 minutes have elapsed, at which point
{ "alpha": "A", "nested": { "beta": "B" }, "gamma": [1, 2, 3], "timestamp": 123 }
would be automatically merged with
{ "alpha": "A", "nested": { "beta": "B", "extra": "field" }, "gamma": [1, 2, 3], "timestamp": 456 }
A single consolidated alert will be sent showing the common keys and the record differences. All of the specified merge keys must have the same value in order for two records to be merged, but those keys can be nested anywhere in the record structure.
Note
The original (unmerged) alert will always be sent to Athena.
- dynamic_outputs
The
dynamic_outputs
keyword argument defines additional outputs to an Alert which are dynamically generated. See dynamic_outputs for more info- outputs
The
outputs
keyword argument defines the alert destination if the return value of a rule isTrue
. Alerts are always sent to an Athena alerts table which is easy to query. Any number of additional outputs can be specified.- req_subkeys
req_subkeys
defines sub-keys that must exist in the incoming record (with a non-zero value) in order for it to be evaluated.This feature should be used if you have logs with a loose schema defined in order to avoid raising a
KeyError
in rules.# The 'columns' key must contain sub-keys of 'address' and 'hostnames' @rule(logs=['osquery:differential'], outputs=['aws-lambda:my-function'], req_subkeys={'columns':['address', 'hostnames']}) def osquery_host_check(rec): # If all logs did not have the 'address' sub-key, this rule would # throw a KeyError. Using req_subkeys avoids this. return rec['columns']['address'] == '127.0.0.1'
Disabling Rules¶
In the event that a rule must be temporarily disabled, the @disable
decorator can be used.
This allows you to keep the rule definition and tests in place instead of having to remove them entirely:
from streamalert.shared.rule import disable, rule
@disable # TODO: this rule is too noisy!
@rule(logs=['example'], outputs=['slack'])
def example_rule(record):
return True
Testing¶
To test the accuracy of new rules, local tests can be written to verify that alerts trigger against valid input.
The manage.py
CLI tool comes built-in with a test
command which does exactly this.
Configuration¶
To test a new rule, first create a new JSON file next to your rule file. The suggested convention is to use the same name as the rule you are testing, but you can choose any name you would like. This will help with organization, but you may also create test events to test your rules anywhere within the same top-level directory where your rules are stored.
Basic Configuration¶
Each test event file should contain the following structure:
[
{
"data": {
"key_01": "value_01",
"key_02": "value_02"
},
"description": "This test should trigger or not trigger an alert",
"log": "The log name declared in a json file under the conf/schemas directory",
"service": "The service sending the log - kinesis, s3, sns, or streamalert_app",
"source": "The exact resource which sent the log - kinesis stream name, s3 bucket ID, SNS topic name, or streamalert_app function name",
"trigger_rules": [
"rule_name_that_should_trigger_for_this_event",
"another_rule_name_that_should_trigger_for_this_event"
]
}
]
Note
Multiple tests can be included in one file by adding them to the array above.
Specifying Test Data¶
When specifying the test data, it can be either of two fields:
"data"
: An entire example record, with all necessary fields to properly classify"override_record"
: A subset of the example record, where only relevant fields are populated
The advantage of option #2 is that the overall test event is much smaller.
The testing framework will auto-populate the records behind the scenes with the remaining fields for that given log type.
For example:
[
{
"data": {
"account": "123456789102",
"detail": {
"request": {
"eventName": "putObject",
"bucketName": "testBucket"
}
},
"detail-type": "API Call",
"id": "123456",
"region": "us-west-2",
"resources": [
"testBucket"
],
"source": "aws.s3",
"time": "Jan 01 2018 12:00",
"version": "1.05"
},
"description": "An example test with a full cloudwatch event",
"log": "cloudwatch:events",
"service": "s3",
"source": "test-s3-bucket-name",
"trigger_rules": [
"my_fake_rule"
]
}
]
Let’s say a rule is only checking the value of source
in the test event. In that case, there’s no added benefit to fill in all the other data. Here is what the event would look like with override_record
:
[
{
"override_record": {
"source": "aws.s3"
},
"description": "An example test with a partial cloudwatch event",
"log": "cloudwatch:events",
"service": "s3",
"source": "test-s3-bucket-name",
"trigger_rules": [
"my_fake_rule"
]
}
]
Both test events would have the same result, but with much less effort.
Note
Either override_record
or data
is required in the test event
Testing Classification¶
Classification tests are always run on each test. Consider these two fields in the test configuration:
[
{
"log": "cloudwatch:events",
"classify_only": true
}
]
The log
field in each test specifies the expected classified type of the test record. The test will fail
if the classified log type differs.
By default, the test runner will continue on to test rules. If you only wish to test classification,
specify classify_only
as true
.
Testing Rules¶
Assuming a test is not classify_only
, rules are run after classification. Consider this field in the test file:
[
{
"trigger_rules": [
"my_first_fake_rule",
"my_second_fake_rule"
]
}
]
All rules are run on each set of test data. The trigger_rules
field specifies an array of rule names that should
be triggered as a result. An empty array implies that the test data should not trigger any rules.
Publisher Tests¶
Consider the following rule:
@rule(
logs=['cloudwatch:events'],
outputs=['slack:sample-channel'],
publishers={'slack': my_publisher}
)
def my_rule(record):
# .. something logic
return True
To test the output of the Alert Publisher framework, you can specify publisher_tests
. Consider this field:
[
{
"trigger_rules": ["my_rule"],
"publisher_tests": {
"slack:sample-channel": [
{
"jmespath_expression": "path.to.record",
"condition": "is",
"value": 4
},
[ "path.to.other.record", "is", 5 ]
]
}
}
]
This field is a dictionary, where keys specify outputs to test. Each key’s value is an array of publisher tests. These tests compare the Alert Publisher’s output to a configured expectation.
Each publisher test can be a dict with 3 keys:
jmespath_expression
: A jmespath search expression. This is run on the Alert Publisher output for the given OutputDispatcher.condition
: Either “is” or “in”, for equality or substring/subset matching, respectively.value
: The expected value of the field.
The field that is extract via the jmespath_expression
is tested against the expected value, using the conditional.
Note
An alternate shorthand syntax to the above is to specify a triple of strings:
["path.to.field", "is", "value"]
Rule Test Reference¶
Key |
Type |
Required |
Description |
|
|
No |
Whether or not to compress records with |
|
|
Yes* |
The record to test against your rules. All |
|
|
Yes* |
A partial record to use in test events, more information below
*This is not required if the |
|
|
Yes |
A short sentence describing the intent of the test |
|
|
Yes |
The log type this test record should parse as. The value of this
should be taken from the defined logs in one or more files in the |
|
|
Yes |
The name of the service which sent the log.
This should be one of: |
|
|
Yes |
The name of the Kinesis Stream or S3 bucket, SNS topic or StreamAlert App
function where the data originated from. This value should match a source
provided in the |
|
|
No |
A list of zero or more rule names that this test record should trigger. An empty list implies this record should not trigger any alerts |
|
|
No |
Whether or not the test record should go through the rule processing engine.
If set to |
|
|
No |
This is a dict of tests to run against the Alert’s published representation. The keys of the dict are output descriptors. The values of the dict should be arrays of individual tests. Publisher tests use jmespath to extract values from the final publication dictionary for testing. At least one rule should be triggered, or publisher tests will do nothing. |
|
|
No |
Values to be mocked out for use within rules for the |
Test Fixtures Configuration¶
Fixtures for tests events should be configured as part of the event itself. These should be
added within the threat_intel
or lookup_tables
keys under a test_fixtures
section
of the test event. Usage of these two sections is outlined below.
Threat Intel Fixtures¶
The below format should be used to “mock” out threat intel data to test rules that leverage this feature.
[
{
"test_fixtures": {
"threat_intel": [
{
"ioc_value": "1.2.3.4",
"ioc_type": "ip",
"sub_type": "mal_ip"
},
{
"ioc_value": "0123456789abcdef0123456789abcdef",
"ioc_type": "md5",
"sub_type": "mal_md5"
}
]
}
}
]
Lookup Tables Fixtures¶
The below format should be used to “mock” out lookup table data to test rules that leverage this feature.
[
{
"test_fixtures": {
"lookup_tables": {
"dynamodb-table-name": {
"lookup_key": [
"value_for_rule"
]
}
}
}
}
]
For more examples of how to configure tests for rules, see the provided default rules and tests in the rules/
directory
Running Tests¶
Tests are run via the manage.py
script. These tests include the ability to validate defined
log schemas for accuracy, as well as rules efficacy. Additionally, alerts can be sent from the local
system to a real, live alerting output (if configured).
The below options are available for running tests. Please note that each subsequent test command
here includes all of the prior tests. For instance, the rules
command will also test everything
that the classifier
command tests. See the Test Options section for available options for
all of these commands.
Classifier Tests¶
Running tests to ensure test events classify properly:
python manage.py test classifier
Note
The classifier
test command does not test the efficacy of rules, and simply ensures
defined test events classify as their expected schema type.
Rule Tests¶
Running tests to ensure test events classify properly and trigger the designated rules:
python manage.py test rules
Live Tests¶
Running tests to actually send alerts to a rule’s configured outputs:
python manage.py test live
Note
The live
test command does not invoke any deployed Lambda functions, and only
uses the local code, test events, and rules. However, authentication secrets needed to send alerts
are in fact read from S3 during this process, so AWS credentials must still be set up properly.
Test Options¶
Any of the test commands above can be restricted to specific files to reduce time and output:
python manage.py test classifier --test-files <test_file_01.json> <test_file_02>
Note
Only the name of the file is required, with or without the file extension, not the entire path.
Tests can also be restricted to specific rules:
python manage.py test rules --test-rules <rule_01> <rule_02>
Note
Note that this is the name of the rule(s) themselves, not the name of the Python file containing the rule(s).
Tests can be directed to run against an alternative directory of test event files:
python manage.py test rules --files-dir /path/to/alternate/test/files/directory
Note
Note that this is the name of the rule(s) themselves, not the name of the Python file containing the rule(s).
Test Examples¶
Here is a sample command showing how to run tests against two test event files included in the default StreamAlert configuration:
python manage.py test rules --test-files rules/community/cloudwatch_events/cloudtrail_put_bucket_acl.json rules/community/cloudwatch_events/cloudtrail_root_account_usage.json
This will produce output similar to the following:
Running tests for files found in: rules
File: rules/community/cloudwatch_events/cloudtrail_put_bucket_acl.json
Test #01: Pass
Test #02: Pass
File: rules/community/cloudwatch_events/cloudtrail_root_account_usage.json
Test #01: Pass
Test #02: Pass
Summary:
Total Tests: 4
Pass: 4
Fail: 0
To see more verbose output for any of the test commands, add the --verbose
flag. The previous
command, with the addition of the --verbose
flag, produces the following output:
Running tests for files found in: rules
File: rules/community/cloudwatch_events/cloudtrail_put_bucket_acl.json
Test #01:
Description: Modifying an S3 bucket to have a bucket ACL of AllUsers or AuthenticatedUsers should create an alert.
Classification: Pass
Classified Type: cloudwatch:events
Expected Type: cloudwatch:events
Rules: Pass
Triggered Rules: cloudtrail_put_bucket_acl
Expected Rules: cloudtrail_put_bucket_acl
Test #02:
Description: Modifying an S3 bucket ACL without use of AllUsers or AuthenticatedUsers should not create an alert.
Classification: Pass
Classified Type: cloudwatch:events
Expected Type: cloudwatch:events
Rules: Pass
Triggered Rules: <None>
Expected Rules: <None>
File: rules/community/cloudwatch_events/cloudtrail_root_account_usage.json
Test #01:
Description: Use of the AWS 'Root' account will create an alert.
Classification: Pass
Classified Type: cloudwatch:events
Expected Type: cloudwatch:events
Rules: Pass
Triggered Rules: cloudtrail_root_account_usage
Expected Rules: cloudtrail_root_account_usage
Test #02:
Description: AWS 'Root' account activity initiated automatically by an AWS service on your behalf will not create an alert.
Classification: Pass
Classified Type: cloudwatch:events
Expected Type: cloudwatch:events
Rules: Pass
Triggered Rules: <None>
Expected Rules: <None>
Summary:
Total Tests: 4
Pass: 4
Fail: 0
Additionally, any given test that results in a status of Fail will, by default, print verbosely.
In the below example, the cloudtrail_put_bucket_acl.json
file has been altered to include a triggering
rule that does not actually exist.
python manage.py test rules --test-files rules/community/cloudwatch_events/cloudtrail_put_bucket_acl.json rules/community/cloudwatch_events/cloudtrail_root_account_usage.json
Running tests for files found in: rules
File: rules/community/cloudwatch_events/cloudtrail_put_bucket_acl.json
Test #01:
Description: Modifying an S3 bucket to have a bucket ACL of AllUsers or AuthenticatedUsers should create an alert.
Classification: Pass
Classified Type: cloudwatch:events
Expected Type: cloudwatch:events
Rules: Fail
Triggered Rules: cloudtrail_put_bucket_acl
Expected Rules: cloudtrail_put_bucket_acl, nonexistent_rule (does not exist)
Test #02: Pass
File: rules/community/cloudwatch_events/cloudtrail_root_account_usage.json
Test #01: Pass
Test #02: Pass
Summary:
Total Tests: 4
Pass: 3
Fail: 1
Helpers¶
It may occasionally be necessary to dynamically fill in values in the test event data. For instance, if a
rule relies on the time of an event, the last_hour
helper can be embedded in a test event as a key’s value.
The embedded helper string will be replaced with the value returned by the helper function.
Available Helpers¶
last_hour
: Generates a unix epoch time within the last hour (ex: 1489105783
).
Usage¶
To use these helpers in rule testing, replace a specific log field value with the following:
"<helper:helper_name_goes_here>"
For example, to replace a time field with a value in the last hour, use last_hour
:
{
"records": [
{
"data": {
"host": "app01.prod.mydomain.net",
"time": "<helper:last_hour>"
},
"description": "example usage of helpers",
"log": "host_time_log",
"service": "kinesis",
"source": "my_demo_kinesis_stream",
"trigger_rules": [
"last_hour_rule_name"
]
}
]
}
Outputs¶
StreamAlert comes with a flexible alerting framework that can integrate with new or existing case/incident management tools. Rules can send alerts to one or many outputs.
Out of the box, StreamAlert supports:
Amazon CloudWatch Logs
Amazon Kinesis Firehose
AWS Lambda
AWS S3
AWS SES
AWS SNS
AWS SQS
Carbon Black
Demisto
GitHub
Jira
Komand
PagerDuty
Phantom
Slack
Microsoft Teams
StreamAlert can be extended to support any API. Creating a new output to send alerts to is easily accomplished through inheritance from the StreamOutputBase
class. More on that in the Adding Support for New Services section below.
With the addition of an output configuration, multiple outputs per service are now possible. As an example, it is now possible for rules to dispatch alerts to multiple people or channels in Slack.
Adhering to the secure by default principle, all API credentials are encrypted and decrypted using AWS Key Management Service (KMS). Credentials are stored on Amazon S3 and are not packaged with the StreamAlert code. They are downloaded and decrypted on an as-needed basis. Credentials are never cached on disk in a decrypted state.
Configuration¶
Adding a new configuration for a currently supported service is handled using manage.py
:
python manage.py output <SERVICE_NAME>
Note
<SERVICE_NAME>
above should be one of the following supported service identifiers.
aws-cloudwatch-log
, aws-firehose
, aws-lambda
, aws-lambda-v2
, aws-s3
,
aws-sns
, aws-sqs
, carbonblack
, github
, jira
, komand
, pagerduty
,
pagerduty-incident
, pagerduty-v2
, phantom
, slack
For example:
python manage.py output slack
The above command will then prompt the user for a descriptor
to use for this configuration:
Please supply a short and unique descriptor for this Slack integration (eg: channel, group, etc):
After a descriptor
is provided, the user is then prompted for the Slack webhook URL:
Please supply the full Slack webhook url, including the secret:
Note
The user input for the Slack webhook URL will be masked. This ‘masking’ approach currently applies to any potentially sensitive information the user may have to enter on the cli and can be enforced through any new services that are implemented.
Adding Support for New Services¶
Adding support for a new service involves five steps:
Create a subclass of
OutputDispatcher
For reference,
OutputDispatcher
is declared instreamalert/alert_processor/outputs/output_base.py
Implement the following methods, at a minimum:
from streamalert.alert_processor.helpers import compose_alert def get_user_defined_properties(self): """Returns any properties for this output that must be provided by the user At a minimum, this method should prompt the user for a 'descriptor' value to use for configuring any outputs added for this service. Returns: [OrderedDict] Contains various OutputProperty items """ return OrderedDict([ ('descriptor', OutputProperty(description='a short and unique descriptor for this service configuration ' '(ie: name of integration/channel/service/etc)')) ]) def _dispatch(self, alert, descriptor): """Handles the actual sending of alerts to the configured service. Any external API calls for this service should be added here. This method should return a boolean where True means the alert was successfully sent. In general, use the compose_alert() method defined in streamalert.alert_processor.helpers when presenting the alert in a generic polymorphic format to be rendered on the chosen output integration. This is so specialized Publishers can modify how the alert is represented on the output. In addition, adding output-specific fields can be useful to offer more fine-grained control of the look and feel of an alert. For example, an optional field that directly controls a PagerDuty incident's title: - '@pagerduty.incident_title' When referencing an alert's attributes, reference the alert's field directly (e.g. alert.alert_id). Do not rely on the published alert. """ publication = compose_alert(alert, self, descriptor) # ... return TrueSee the below for more information on the
OutputProperty
object.
Implement the private
__service__
property within the new subclass.This should be a string value that corresponds to an identifier that best represents this service. (eg:
__service__ = 'aws-s3'
)
Add the
@StreamAlertOutput
class decorator to the new subclass so it registered when the outputs module is loaded.Extend the
AlertProcessorTester.setup_outputs
method instreamalert_cli/test.py
module to provide mock credentials for your new output.
The OutputProperty
Object¶
The OutputProperty
object used in get_user_defined_properties
is a namedtuple
consisting of a few properties:
- description
A description that is used when prompting the user for input. This is to help describe what is expected from the user for this property. At a bare minimum, this property should be set for all instances of
OutputProperty
. Default is:''
(empty string)- value
The actual value that the user enters for this property. This is replaced using
namedtuple._replace
during user input. Default is:''
(empty string)- input_restrictions
A
set
of character values that should be restricted from user input for this property. Default is:{' ', ':'}
- mask_input
A
boolean
that indicates whether the user’s input should be masked usinggetpass
during entry. This should be set for any input that is potentially sensitive. Default is:False
- cred_requirement
A
boolean
that indicates whether this value is required for API access with this service. Ultimately, setting this value toTrue
indicates that the value should be encrypted and stored in Amazon Systems Manager. Default is:False
Strategy¶
A common strategy that has been found to be effective:
Write your rule and only designate a notification-style service, such as Slack, as an output
Identify false positives, refine the rule over a period of time
“Promote” the rule to production by removing Slack and adding PagerDuty and S3 as outputs
Why:
Slack alerts are ephemeral, great for new/beta rules
PagerDuty supports webhooks and can still ping Slack
S3 will act as a persistent store for production alerts (audit trail, historical context)
Dynamic Outputs¶
Prerequisites¶
Any output assigned must be added with
python manage.py output
functions
must returnNone
,str
orList[str]
which maps to an output configured with the above.Only pass
context
if therule
sets context.
Overview¶
Adds the ability to have custom logic run to define an output
or outputs
based on information within the record
.
For information on supported outputs and how to add support for additional outputs, see outputs
As can be seen by the examples below, they are easy to configure, but add a very useful feature to StreamAlert.
StreamAlert sends to all outputs defined within a rules
outputs=[]
anddynamic_outputs=[]
when sendingAlerts
.It is also possible to pass
context
to thedynamic_function
if therule
sets it.
Note
Any output
passed must be configured with ./manage.py output -h
Example: Simple¶
The below code block is considered a simple dynamic_output
function, because the outputs are dynamically configured, but the information used still lives within the code. It also:
allows you to maintain a static list of information inside your code
will return the outputs relevant to the team who “own” the account
Alerts
are sent to theaws-sns:security
output aswell as those returned by the function
from streamalert.shared.rule import rule
def map_account_to_team(record):
teams = {
"team_a": {"accounts": ["123", "456", ...], "outputs": ["aws-sns:team_a"]},
"team_b": {"accounts": ["789", ...], "outputs": ["aws-sns:team_b", "slack:team_b"]},
}
account_id = record.get('recipientaccountid')
for team in teams:
if account_id in team["accounts"]:
return team["outputs"]
# None is guarded against by StreamAlert
@rule(
logs=['cloudwatch:events'],
req_subkeys={
'detail': ['userIdentity', 'eventType']
},
outputs=["aws-sns:security"],
dynamic_outputs=[map_account_to_team]
)
def cloudtrail_root_account_usage(rec):
# Rule logic
Example: With LookupTables¶
With the simple addition of a lookup-table you can take a rule like cloudtrail_root_account_usage
and configure it as such:
from streamalert.shared.rule import rule
from streamalert.shared.lookup_tables.core import LookupTables
def dynamic_output_with_context(record, context): # pass context only if the rule added context
account_id = context["account_id"]
return LookupTables.get(
'my_lookup_table',
'aws-account-owner:{}'.format(account_id),
None
) # potentially returns [aws-sns:team_a]
@rule(
logs=['cloudwatch:events'],
outputs=["aws-sns:security],
dynamic_outputs=[dynamic_output_with_context],
context={"account_id": "valid_account_id"},
)
def cloudtrail_root_account_usage(rec):
context["account_id"] = record.get('recipientaccountid')
# Rule logic
The above has the benefit of using information that lives outside of StreamAlert, which means teams can acquire new accounts and get Alerts
without having to alter StreamAlert code.
Example: With Other Data Source¶
from streamalert.shared.rule import rule
import requests
def dynamic_output(record):
account_id = record.get('recipientaccountid')
# invoke an external API to get data back
response = requests.get("API/team_map")
for team in response.json():
if account_id in team["accounts"]:
return team["outputs"] # potentially "aws-lambda:team_a"
@rule(
logs=['cloudwatch:events'],
outputs=["aws-sns:security],
dynamic_outputs=[dynamic_output],
)
def cloudtrail_root_account_usage(rec):
# Rule logic
The above example uses an external API to get the output map, which is to be queried with the account_id
on the record.
This is just an example, but hopefully highlights many ways in which dynamic_outputs
can be used.
Warning
The above example could result in many queries to the API in use and could potentially slow down StreamAlert
Lambdas when processing Alerts
.
Publishers¶
Publishers are a framework for transforming alerts prior to dispatching to outputs, on a per-rule basis. This allows users to customize the look and feel of alerts.
How do Publishers work?¶
Publishers are blocks of code that are run during alert processing, immediately prior to dispatching an alert to an output.
Implementing new Publishers¶
All publishers must be added to the publishers
directory. Publishers have two valid syntaxes:
Function¶
Implement a top-level function with that accepts two arguments: An Alert and a dict. Decorate this function
with the @Register
decorator.
from streamalert.shared.publisher import Register
@Register
def my_publisher(alert: Alert, publication: dict) -> dict:
# ...
return {}
Class¶
Implement a class that inherits from the AlertPublisher
and fill in the implementations for publish()
.
Decorate the class with the @Register
decorator.
from streamalert.shared.publisher import AlertPublisher, Register
@Register
class MyPublisherClass(AlertPublisher):
def publish(alert: Alert, publication: dict) -> dict:
# ...
return {}
Recommended Implementation¶
Publishers should always return dicts containing only simple types (str, int, list, dict).
Publishers are executed in series, each passing its published Alert
to the next publisher. The publication
arg is the result of the previous publisher (or {}
if it is the first publisher in the series). Publishers
should freely add, modify, or delete fields from previous publications. However, publishers should avoid
doing in-place modifications of the publications, and should prefer to copy-and-modify:
from streamalert.shared.publisher import Register
@Register
def sample_publisher(alert, publication):
publication['new_field'] = 'new_value']
publication.pop('old_field', None)
return publication
Preparing Outputs¶
In order to take advantage of Publishers, all outputs must be implemented with the following guidelines:
Use compose_alert()
When presenting unstructured or miscellaneous data to an output (e.g. an email body, incident details),
outputs should be implemented to use the compose_alert(alert: Alert, output: OutputDispatcher, descriptor: str) -> dict
method.
compose_alert()
loads all publishers relevant to the given Alert
and executes these publishers in series,
returning the result of the final publisher.
All data returned by compose_alert()
should be assumed as optional.
from streamalert.alert_processor.helpers import compose_alert
def _dispatch(self, alert, descriptor):
# ...
publication = compose_alert(alert, self, descriptor)
make_api_call(misc_data=publication)
“Default” Implementations¶
For output-specific fields that are mandatory (such as an incident Title or assignee), each output should offer a default implementation:
def _dispatch(self, alert, descriptor):
default_title = 'Incident Title: #{}'.format(alert.alert_id)
default_html = '<html><body>Rule: {}</body></html>'.format(alert.rule_description)
# ...
Custom Fields¶
Outputs can be implemented to offer custom fields that can be filled in by Publishers. This (optionally) grants fine-grained control of outputs to Publishers. Such fields should adhere to the following conventions:
They are top level keys on the final publication dictionary
Keys are strings, following the format:
@{output_service}.{field_name}
Keys MUST begin with an at-sign
The
output_service
should match the current outputscls.__service__
valueThe
field_name
should describe its functionExample:
@slack.attachments
Below is an example of how you could implement an output:
def _dispatch(self, alert, descriptor):
# ...
publication = compose_alert(alert, self, descriptor)
default_title = 'Incident Title: #{}'.format(alert.alert_id)
default_html = '<html><body>Rule: {}</body></html>'.format(alert.rule_description)
title = publication.get('@pagerduty.title', default_title)
body_html = publication.get('@pagerduty.body_html', default_html)
make_api_call(title, body_html, data=publication)
Alert Fields¶
When outputs require mandatory fields that are not subject to publishers, they should reference the alert
fields directly:
def _dispatch(self, alert, descriptor):
rule_description = alert.rule_description
# ...
Registering Publishers¶
Register publishers on a rule using the publisher
argument on the @rule
decorator:
from publishers import publisher_1, publisher_2
from streamalert.shared.rule import Rule
@rule(
logs=['stuff'],
outputs=['pagerduty', 'slack'],
publishers=[publisher_1, publisher_2]
)
def my_rule(rec):
# ...
The publishers
argument is a structure containing references to Publishers and can follow any of the
following structures:
Single Publisher¶
publishers=publisher_1
When using this syntax, the given publisher will be applied to all outputs.
List of Publishers¶
publishers=[publisher_1, publisher_2, publisher_3]
When using this syntax, all given publishers will be applied to all outputs.
Dict mapping Output strings to Publisher¶
publishers={
'pagerduty:analyst': [publisher_1, publisher_2],
'pagerduty': [publisher_3, publisher_4],
'demisto': other_publisher,
}
When using this syntax, publishers under each key will be applied to their matching outputs. Publisher keys
with generic outputs (e.g. pagerduty
) are loaded first, before publisher keys that pertain to more
specific outputs (e.g. pagerduty:analyst
).
The order in which publishers are loaded will dictate the order in which they are executed.
DefaultPublisher¶
When the publishers
argument is omitted from a @rule
, a DefaultPublisher
is loaded and used. This
also occurs when the publishers
are misconfigured.
The DefaultPublisher
is reverse-compatible with old implementations of alert.output_dict()
.
Putting It All Together¶
Here’s a real-world example of how to effectively use Publishers and Outputs:
PagerDuty requires all Incidents be created with an Incident Summary, which appears at as the title of every incident in its UI. Additionally, you can optionally supply custom details which appear below as a large, unstructured body.
By default, the PagerDuty integration sends "StreamAlert Rule Triggered - rule_name"
as the Incident Summary,
along with the entire Alert record in the custom details.
However, the entire record can contain mostly irrelevant or redundant data, which can pollute the PagerDuty UI and make triage slower, as responders must filter through a large record to find the relevant pieces of information, this is especially true for alerts of very limited scope and well-understood remediation steps.
Consider an example where informational alerts are triggered upon login into a machine. Responders only care about the time of login, source IP address, and the username of the login.
You can implement a publisher that only returns those three fields and strips out the rest from the alert. The publisher can also simplify the PagerDuty title:
from streamalert.shared.publisher import Register
@Register
def simplify_pagerduty_output(alert, publication):
return {
'@pagerduty.record': {
'source_ip': alert.record['source_ip'],
'time': alert.record['timestamp'],
'username': alert.record['user'],
},
'@pagerduty.summary': 'Machine SSH: {}'.format(alert.record['user']),
}
Suppose this rule is being output to both PagerDuty and Slack, but you only wish to simplify the PagerDuty integration, leaving the Slack integration the same. Registering the publisher can be done as such:
from publishers.pagerduty import simplify_pagerduty_output
from streamalert.shared.rule import Rule
@rule(
logs=['ssh'],
outputs=['slack:engineering', 'pagerduty:engineering'],
publishers={
'pagerduty:engineering': simplify_pagerduty_output,
}
)
def machine_ssh_login(rec):
# ...
Lookup Tables¶
LookupTables is a framework for injecting additional data into StreamAlert Lambda functions. LookupTables offers a unified key-value interface into a set of backend storage solutions, allowing StreamAlert Lambda functions to use state from outside of the raw telemetry that they receive.
With LookupTables, StreamAlert can hydrate alerting data, add statefulness to alerts, scalable pull down remote data, rapidly tune rule logic, and much more!
How do LookupTables work?¶
LookupTables provides unified Python interface into backend data storage mechanisms. The two currently supported storage solutions are Amazon S3 and Amazon DynamoDB.
LookupTables makes these storage solutions available to StreamAlert’s Lambda functions. It is available on all classifiers, the rules engine, the alert merger, and the alert processor.
Usage¶
Pulling keys from LookupTables is very easy!
from streamalert.shared.lookup_tables.core import LookupTables
value = LookupTables.get('my-table', 'my-key', 'default-value')
The three arguments are as follows
Table Name — The name of the LookupTable. This is specified in the config (below).
Key — A key on the given LookupTable.
Default Value — If the key is not found, this value will be returned instead. Notably, if the key is empty the empty value will be returned, NOT this default value.
Configuration¶
LookupTables is configured via a single file, conf/lookup_tables.json
.
{
"enabled": false,
"tables": {
"my-table-1": {
"driver": "dynamodb",
"table": "dynamodb-table-name",
"partition_key": "partition-key",
"value_key": "value-column",
"cache_maximum_key_count": 3,
"cache_refresh_minutes": 3,
"consistent_read": false
},
"my-table-2": {
"driver": "s3",
"bucket": "s3-bucket-name",
"key": "file.json",
"compression": false,
"cache_refresh_minutes": 10
}
}
}
enabled
— (bool) Pass true to activate LookupTables. Leave false to disable.tables
— (dict) A dict mapping the name of a LookupTable to its corresponding configuration. The exact configuration varies from driver to driver. See below:
S3 Driver¶
This uses Amazon S3. It stores all LookupTables data into a single file in an S3 bucket, specified in the configuration:
{
"driver": "s3",
"bucket": "airbnb.sample.lookuptable",
"key": "resource_map.json.gz",
"compression": "gzip",
"cache_refresh_minutes": 10
}
driver
— (str) Uses3
bucket
— (str) The name of the S3 bucket. It must be in the same AWS account. NOTE: Multiple S3 LookupTables can use the same bucket, as long as they reference differentkey
’s.key
— (str) The S3 object key (aka filename) in the bucket.compression
— (str|bool) The compression algorithm of the S3 object. Currently only supportsgzip
. Passfalse
if the object is not compressed and is stored as JSON plaintext.cache_refresh_minutes
— (int) Number of minutes to cache the entire table. See the caching <#Caching> section below.
The Nitty Gritty¶
Because S3 driver stores all data in a single S3 file, it loads the entire table upfront. This is beneficial to StreamAlert code that scans multiple keys back-to-back, as only a single HTTP call will be made to S3, and subsequent calls will be made to the caching layer.
On the other hand, because the entire S3 file is loaded into memory, large S3 files can risk running into the memory ceiling of StreamAlert’s Lambda functions.
DynamoDB Driver¶
This driver uses DynamoDB as the storage layer. This driver stores individual keys as discrete rows on the DynamoDB table. The DynamoDB driver can be configured to respect both tables with a single partition key, as well as tables with both a partition and a sort key.
{
"driver": "dynamodb",
"table": "some_table_name",
"partition_key": "MyPartitionKey",
"sort_key": "MySortKey",
"value_key": "MyValueKey",
"consistent_read": false,
"key_delimiter": ":",
"cache_refresh_minutes": 2,
"cache_maximum_key_count": 10
}
driver
— (str) Usedynamodb
table
— (str) The name of the DynamoDB table. This table must be on the same AWS region as the StreamAlert deployment.partition_key
— (str) The name of the partition key. The partition key MUST be a string type.sort_key
— (str) (Optional) The name of the sort key, if one exists. The sort key MUST be a string type.value_key
— (str) The name of the value column. NOTE: Multiple LookupTables can be “overlapped” on a single DynamoDB table, using differentvalue_key
’s.consistent_read
— (bool) (Optional) Whentrue
, it forces DynamoDB queries to be strongly consistent. This reduces performance, (potentially increasing HTTP latency during dynamo calls), but guarantees that modified values to LookupTables will be immediately available. Passingfalse
allows eventually consistent reads, which can greatly improve performance.key_delimiter
— (str) (Optional) When accessing keys in a DynamoDB LookupTable that uses both apartition_key
and asort_key
, the syntax of the final key is{partition_key}{delimiter}{sort_key}
. The default delimiter is a colon (:
), but this parameter can be provided to offer a different delimiter.cache_refresh_minutes
— (int) Number of minutes to cache each individual key.cache_maximum_key_count
— (int) Maximum number of keys to cache on this LookupTable. Once the cache is full, keys will be evicted on a random-selection basis.
The Nitty Gritty¶
The DynamoDB driver is designed to retrieve a minimal amount of data per request. This reduces the memory footprint compared to the S3 driver, and can reduce the Lambda memory limit required to prevent out-of-memory errors.
As a tradeoff, rapid back-to-back accesses of different keys will result in many HTTP calls being made to DynamoDB, which can slow down StreamAlert’s Lambda execution.
Caching¶
To reduce redundant requests to storage layers, LookupTables offers a simple in-memory caching layer.
It can be configured using the cache_refresh_minutes
configuration setting under each driver.
This will persist data retrieved from the storage solutions for a number of minutes in memory. This can increase Lambda memory consumption, but can also reduce runtime by reducing number of HTTP calls.
Putting Data Into LookupTables¶
It is not advisable (yet) for StreamAlert Lambdas to write values into LookupTables. It is generally advisable for external Lambdas (or other processes) to manage the data in LookupTables.
CLI Management¶
There is a StreamAlert CLI command for managing LookupTables, python manage.py lookup-tables
, with three subcommands:
describe-tables
get
set
Use the -h
flag to learn how to use them.
Best Practices¶
This section documents several best practices in no particular order.
Organize LookupTables Data¶
While LookupTables can support storage of whatever-data in whatever-table using whatever-key, for usage patterns that push scaling limits, it is generally advisable to organize data into tables that optimize for their access patterns.
It is advisable to split the data into many LookupTables, each containing data of similar access patterns.
When to use S3, and when to use Dynamo¶
Because it can condense the entire data fetching process into a single HTTP request, the S3 driver functions most optimally with small data sets that are often accessed together or interdependently. It is generally inadvisable to store massive amounts of data on a single S3 file.
S3 is ideal for “table scan” types of data. For example, long lists of IP addresses, whitelists, or dict mappings of hosts to metadata. S3 is also ideal for data that is often used together.
Caching Best Practices¶
Really, we haven’t found any reason to stress out about these values. Setting 5 minutes or 10 minutes is enough.
More effective is to use the DynamoDB driver with cache_maximum_key_count
. This allows more fine-grained
control of the maximum memory consumption of the cache.
Prefer Eventually Consistent Reads¶
We strongly recommend allowing eventually consistent reads on the DynamoDB driver. The public SLA for eventually consistent reads is 20 seconds, with a typical delay of less than 3 seconds.
Deployment¶
When LookupTables are configured properly, a subsequent run of python manage.py generate
or python manage.py build
will create a new file: terraform/lookup_tables.tf.json
and build the appropriate IAM PERMISSIONS for
the StreamAlert Lambdas to access them.
It will not build the actual S3 buckets or DynamoDB tables, however. Those resources have to be built elsewhere.
Usage Ideas¶
Whitelist¶
Instead of placing whitelists inline in code:
IP_WHITELIST = [
'2.2.2.2',
'8.8.8.8',
'8.0.8.0',
]
Consider using LookupTables:
IP_WHITELIST = LookupTables.get('whitelists', 'ip_whitelist', [])
External Configuration¶
Suppose StreamAlert receive a piece of telemetry that includes a hostname:
{
"hostname": "securityiscool.airbnb.com",
...
}
But suppose the rules logic requires an IP address instead. LookupTables can be used to retrieve realtime information about the DHCP or DNS information about that hostname, even if the IP address is not available in the original telemetry.
@rule(
# ...
)
def my_rule(rec):
hostname = get_key(rec, 'hostname')
dns_metadata = LookupTables.get('dns_information', 'host:{}'.format(hostname), {})
# rules logic here...
Apps¶
App Configuration¶
For StreamAlert and other related platforms, log forwarding is usually left as an exercise to the reader. This work is non-trivial and often requires new infrastructure and code. We wanted to make this easier for everyone and have achieved this through StreamAlert Apps.
Apps allow you to collect logs from popular services and applications in minutes. You simply provide the application’s credentials and StreamAlert will deploy an individual serverless application that will fetch and forward logs to StreamAlert for analysis and alerting.
Concepts¶
Apps are made possible through the use of AWS technologies:
Supported Services¶
-
Authentication Logs
Administrator Logs
-
Events Logs
G Suite Reports (Activities)
Admin
Calendar
Google Drive
Groups
Google Plus
Logins
Mobile Audit
Rules
SAML
Authorization Tokens
-
Admin Events
-
Access Logs
Integrations Logs
Admin Activity Logs
More to come
Getting Started¶
An initial deploy of StreamAlert must be performed before Apps can be configured. If you haven’t deployed StreamAlert yet, please visit the Getting Started page to get up and running.
Deploying an App only takes 3 steps:
Configure the App through the CLI (via
python manage.py app new
).Enter the required authentication information.
Deploy the new App and the Classifier.
To get help configuring a new App, use:
python manage.py app new --help
Configuring an App¶
The StreamAlert CLI is used to add a new App configuration.
python manage.py app new \
--type duo_auth \
--cluster prod \
--name duo_prod_collector \
--interval 'rate(2 hours)' \
--timeout 80 \
--memory 128
Flag |
Description |
|
Type of app integration function being configured. Current choices are: duo_auth, duo_admin |
|
Applicable cluster this function should be configured against. |
|
Unique name to be assigned to the App. This is useful when configuring multiple accounts per service. |
|
The interval, defined using a ‘rate’ expression, at which this app integration function should execute. See AWS Schedule Rate Expressions. |
|
The AWS Lambda function timeout value, in seconds. This should be an integer between 10 and 300. |
|
The AWS Lambda function max memory value, in megabytes. This should be an integer between 128 and 1536. |
Note
Duo Security’s Admin API is limited to two (2) requests per-minute. Therefore, setting the
--timeout
flag to any value between 10 and 60 will be of no additional value. A recommended
timeout of 80 seconds will guarantee four (4) requests happen per-execution.
Enter the Authentication Info¶
The above command will result in a few prompts asking for the required authentication information needed to configure this App.
Note
After the last required authentication value is entered, the values are sent to AWS SSM’s
Parameter Store as a SecureString
to be used as part of this App’s config. Due to this requirement, please ensure you have the correct
and valid AWS credentials loaded before continuing.
Please supply the API URL for your duosecurity instance. This should be in a format similar to 'api-abcdef12.duosecurity.com': api-abcdef12.duosecurity.com
Please supply the secret key for your duosecurity Admin API. This should be a string of 40 alphanumeric characters: 123424af2ae101d47d9704b783c940dffa825678
Please supply the integration key for your duosecurity Admin API. This should be in a format similar to 'DIABCDEFGHIJKLMN1234': DIABCDEFGHIJKLMN1234
Once the above is completed, a logger statement similar to the following will confirm the configuration:
StreamAlertCLI [INFO]: App authentication info successfully saved to parameter store.
StreamAlertCLI [INFO]: Successfully added 'duo_prod_collector' app integration to 'conf/clusters/prod.json' for service 'duo_auth'.
Your configuration file conf/clusters/<cluster>.json
has now been updated and is ready to be deployed.
Deploy the Functions¶
The recommended process is to deploy both the apps function and the classifier processor function with:
python manage.py deploy --functions classifier apps
Authorizing the Slack App¶
The Slack endpoint API requires a bearer token, obtained by going through the slack oauth authentication process. Only one path through the process is supported by the Slack App: manually installing a custom integration.
To obtain the bearer token, an administrator of the Slack workspace must create a custom Slack app, add the admin
permission scope to the custom app, and install the app to the target workspace.
Step by step:
Visit the Create a Slack app page, and in the
Create a Slack App
dialog box fill in theApp Name
field with whatever you like and the select the target workspace from theDevelopment Slack Workspace
dropbdown box. ClickCreate App
.On the
Basic Information
page of the app you just created, scroll to and click onOAuth & Permissions
on the left hand sidebar.Scroll to the
Scopes
section, click on the dropdown box underSelect Permission Scopes
, and typeadmin
to bring up the administrator scope (labeledAdminister the workspace
). Select it, then clickSave changes
.Scroll to the top of that same page and click on
Install App to Workspace
. ClickAuthorize
on the next dialog. You should be returned to theOAuth & Permissions
page.The bearer token is the string labeled with
OAuth Access Token
and beginning withxoxp-
. Provide this when configuring the Slack StreamAlert app.
Enabling the Aliyun App¶
The Aliyun API requires an access key and access key secret for an authorized user.
To obtain the access key and access key secret, an authorized user of the Aliyun account should follow their directions to Create an Access Key.
Additionally, the user for whom the access key was created must have sufficient privileges to make use of ActionTrail; follow the directions on the Grant ActionTrail permissions to RAM users page.
How to set up the Intercom App¶
The Intercom API requires an access token. Get an access token by following these instructions.
To specify an API version, follow these instructions to do so through Intercom’s Developer Hub. The default will be the latest stable version. The Intercom app works on versions 1.2 or later.
Updating an App’s Credentials¶
You may need to change an App’s credentials due to internal rotation policies or otherwise. The StreamAlert CLI allows you to easily update App credentials. to aid in this process, the CLI also give you the ability to list currently configured Apps.
Listing Apps¶
To list currently configured Apps (grouped by cluster), use the CLI command:
python manage.py app list
Example output:
Cluster: prod
Name: duo_prod_collector
log_level: info
interval: rate(2 hours)
timeout: 80
memory: 128
type: duo_auth
Note
The output will show No Apps configured if you have not configured any Apps.
Updating Credentials¶
To update an App’s credentials, run the the following command:
python manage.py app update-auth --cluster <cluster> --name <app_name>
This will have you follow a process similar to configuring a new App.
Developing a New App¶
An App can be created to collect logs from virtually any RESTful API that supports HTTP GET requests.
Developing an App for a currently unsupported service is as easy as:
Add a new file in
streamalert/apps/_apps
to correspond to the new service (eg:box.py
).Create a subclass of the
AppIntegration
class found instreamalert/apps/app_base.py
.Implement the required abstract properties and methods on the new subclass.
Example¶
This is a non-functional example of adding a new App for the Box Events API. This is
to outline what methods from the base AppIntegration
class must be implemented and what those methods must do.
# streamalert/apps/_apps/box.py
from . import AppIntegration, LOGGER, StreamAlertApp
@StreamAlertApp
class BoxApp(AppIntegration):
"""Box StreamAlert App"""
_BOX_API_V2_EVENTS_ENDPOINT = 'https://api.box.com/2.0/events'
_MAX_EVENTS_LIMIT = 500
# Implement this abstractproperty
@classmethod
def service(cls):
return 'box'
# Implement this abstractproperty
@classmethod
def _type(cls):
return 'admin_logs'
# Implement this abstractmethod
def required_auth_info(self):
return {
'secret_key':
{
'description': ('the secret key for this Box instance...'),
'format': re.compile(r'...')
},
'client_id':
{
'description': ('the client_id for this Box instance...'),
'format': re.compile(r'...')
},
'token':
{
'description': ('the token for this Box instance...'),
'format': re.compile(r'...')
}
}
# Implement this abstractmethod
def _sleep_seconds(self):
"""Return the number of seconds this polling function should sleep for between requests
Box imposes the following API limits: 10 API calls per second per user
Box reference: https://developer.box.com/reference#rate-limiting
Basically, this function should guarantee we sleep for 1 second every 10 requests
Returns:
int: Number of seconds that this function should sleep for between requests
"""
return self._poll_count / 10 * 1
# Implement this abstractmethod
def _gather_logs(self):
"""Gather the Box event logs.
This function should set a few things on the superclass:
self._last_timestamp # Set to the last timestamp/stream position from the logs
self._more_to_poll # Set to True if the max # of logs was polled this time
Returns:
list or bool: The list of logs fetched from the service, or False if
there was an error during log collection.
"""
headers = {'Authorization': 'Bearer {}'.format(self._get_oauth())}
params = {'stream_position': self._last_timestamp,
'limit': self._MAX_EVENTS_LIMIT,
'stream_type': 'admin_logs'}
# Make the request to the api, resulting in a bool or dict
response = self._make_request(self._BOX_API_V2_EVENTS_ENDPOINT, headers=headers, params=params)
if not response:
return False
logs = response['entries']
# Set the last timestamp to the next stream position to be used in the next poll
self._last_timestamp = response['next_stream_position']
# Set self._more_to_poll to indicate there are more logs to collect
self._more_to_poll = len(logs) >= self._MAX_EVENTS_LIMIT
return logs
def _get_oauth(self):
"""This should return the oauth token for this request"""
secret_key = self._config.auth['secret_key']
client_id = self._config.auth['client_id']
token = self._config.auth['token']
# Do something to generate oauth
return generated_oauth
Metrics¶
StreamAlert allows to enable Enhanced Monitoring to surface infrastructure metrics at a granular level.
When enabled, access them by going to AWS Console -> CloudWatch -> Metrics -> Kinesis.
Enhanced metrics can be enabled in conf/global.json
as shard_level_metrics
, for example:
"shard_level_metrics": [
"IncomingBytes",
"IncomingRecords",
"OutgoingBytes",
"OutgoingRecords",
"WriteProvisionedThroughputExceeded",
"ReadProvisionedThroughputExceeded",
"IteratorAgeMilliseconds"
]
These metrics can be viewed at the shard-level or the stream-level (cluster/environment).
All of CloudWatch’s features are at your disposal: graphing, dashboards, alerting, and more.
These metrics are useful for debugging, alerting on infrastructure metrics you care about, or for just getting a sense of the scale at which you’re analyzing and alerting on data.
Custom Metrics¶
By default, StreamAlert will log various custom metrics to AWS CloudWatch Metrics via Amazon CloudWatch Logs Metric Filters.
Amazon CloudWatch Logs Metric Filters utilize Filter Patterns to provide an extremely low-cost and highly scalable approach to tracking custom metrics.
The decision to use Metric Filters vs CloudWatch PutMetricData was made easy due to the limitations imposed by the PutMetricData API. StreamAlert has the potential to handle very high throughput, so the posting of metrics needed to be able to keep pace. Metric Filter Patterns leverage the runtime’s existing logger output in order to generate custom metrics that can be graphed, use for alarms, etc.
StreamAlert logs custom metrics on the cluster level and also on the aggregate (aka: global) level. For example, a metric for FailedParses will be created for each individual cluster that is configured. Along with publishing the metric to the respective cluster’s metrics, the metric will also get published to the aggregate value for this metric. Think of the aggregate as a summation of a metric across the entire StreamAlert deployment.
Custom metrics are logged to a unique StreamAlert namespace within CloudWatch Logs. Navigate to AWS Console -> CloudWatch -> Metrics -> StreamAlert to view these metrics.
Custom metrics definitions are found within streamalert/shared/metrics.py
.
Classifier Custom Metrics¶
FailedParses
FirehoseFailedRecords
FirehoseRecordsSent
NormalizedRecords
S3DownloadTime
SQSFailedRecords
SQSRecordsSent
TotalProcessedSize
TotalRecords
TotalS3Records
TotalStreamAlertAppRecords
TriggeredAlerts
Rules Engine Custom Metrics¶
FailedDynamoWrites
TriggeredAlerts
Alert Merger Custom Metrics¶
AlertAttempts
Toggling Custom Metrics¶
Logging of custom metrics will be enabled by default for the Lambda functions that support this feature.
To globally (for all clusters) disable custom metrics for the classifier function:
python manage.py custom-metrics --disable --functions classifier
To disable custom metrics for the classifier function within specific cluster:
python manage.py custom-metrics --disable --functions classifier --clusters <CLUSTER>
Swap the --disable
flag for --enable
in the above commands to have the inverse affect.
Alarms for Custom Metrics¶
With the addition of custom metrics comes the added bonus of CloudWatch alarms for custom metrics. StreamAlert’s CLI can be used to add alarms on custom metrics as you see fit. Custom metric alarms can be applied to both aggregate metrics (across all clusters), or one or more cluster.
To get an up-to-date list of metrics to which alarms can be assigned on a cluster basis, run:
python manage.py create-cluster-alarm --help
To get an up-to-date list of metrics to which alarms can be assigned on an aggregate/global level, run:
python manage.py create-alarm --help
The required arguments for the create-alarm
and create-cluster-alarm
commands mimic what is
required by AWS CloudWatch Metric’s PutMetricAlarm API.
Example: FailedParses, Cluster¶
FailedParses alarm at the prod
cluster level
python manage.py create-cluster-alarm FailedParsesAlarm \
--metric FailedParses \
--metric-target cluster \
--comparison-operator GreaterThanOrEqualToThreshold \
--evaluation-periods 1 \
--period 600 \
--threshold 5.0 \
--alarm-description 'Trigger this alarm if 5 or more failed parses occur within a 10 minute period in the cluster "prod"' \
--clusters prod \
--statistic Sum
Example: TotalRecords, Global¶
TotalRecords alarm on a global level
python manage.py create-alarm MinimumTotalRecordsAlarm \
--metric TotalRecords \
--metric-target aggregate \
--comparison-operator LessThanThreshold \
--evaluation-periods 3 \
--period 600 \
--threshold 200000 \
--alarm-description 'Trigger this alarm if the total incoming records (aggregate) drops below 200000 for 3 consecutive 10 minute time periods in a row' \
--statistic Sum
The custom metric alarms will notify StreamAlert’s default SNS topic for monitoring: <prefix>_streamalert_monitoring
Rule Staging¶
Rule Staging allows dynamic toggling of rules from a staging to production state, and vice versa. Staged rules will only send alerts for historical retention, and the alerts will not be sent to any user-defined outputs, such as Slack, PagerDuty, etc.
To ensure that new rules do not flood alerting outputs with a ton of potential false positives, rules can be staged. After the initial staging period, wherein a noisy rule is tuned to limit extra noise or false positives, staged rules can be promoted.
When Rule Staging is enabled, new rules will, by default, be staged upon a deploy of the Rules Engine Lambda function. See the Skip Staging During Deploy section for more information.
Enabling Rule Staging¶
By default, the rule staging feature is not enabled. It can be enabled with the following command:
python manage.py rule-staging enable --true
The change will be reflected in the conf/globals.json
file.
For additional configuration options related to this feature, see the Rule Staging section of the Global Settings.
The initial implementation of the Rule Staging feature has a hard-coded ‘staging period’, or the time a rule should remain in staging before being auto-promoted to send to user-defined outputs. This is only relevant if the Rule Promotion feature is also enabled. The current default is 48 hours.
CLI Commands¶
There are a few CLI commands available to make management of staged rules easier.
Rule Status¶
The status of rules, meaning whether or not they are staged, can be determined with the following command:
python manage.py rule-staging status
Sample Output:
Rule Staged?
1: rule_001 False
2: rule_002 True
3: rule_003 False
4: rule_004 False
Toggling Staged Status¶
Staging a rule, or list of rules, is possible with the following command:
python manage.py rule-staging stage <rule_001> <rule_002>
Unstaging rules, enabling them to send to all user-defined outputs, is equally as easy and accomplished with the following command:
python manage.py rule-staging unstage <rule_001> <rule_002>
Skip Staging During Deploy¶
As noted above, all new rules will be staged by default during a Rules Engine deploy when the Rule Staging feature is enabled. There may, however, be occasions when all new rules should not be staged during a deploy. To allow for this, the Rules Engine can be deployed with the following command:
python manage.py deploy --functions rule --skip-rule-staging
This will force all new rules to send to user-defined outputs immediately upon deploy, bypassing
the default staging period. Alternatively, the --stage-rules
and --unstage-rules
flags
can be used (instead of the --skip-rule-staging
flag) to stage or unstage specific rules only.
Triaging Staged Rules¶
Once a rule is in staging, alerts generated by that rule can be queried in Athena:
SELECT 'rule_001' as rule_name, count(*) AS alert_count FROM alerts WHERE dt >= '2018-07-25-16' AND rule_name = 'rule_001' AND staged = True
rule_name |
alert_count |
---|---|
rule_001 |
96 |
To help automate triaging of staged rules, StreamAlert includes an optional Rule Promotion Lambda function. This function can both send alert digests via email and auto-promote rules out of staging. See the Rule Promotion page for more detail.
Rule Promotion¶
To complement the Rule Staging feature, StreamAlert includes an optional Rule Promotion Lambda function. This Lambda function is invoked on a user-defined interval, and can automatically ‘promote’ rules out of staging.
Once rules are promoted, they will send alerts to all user-defined outputs. The function is also capable of sending digest emails to a Simple Notification Service (SNS) topic with statistics on how many alerts staged rules have generated.
Enabling Rule Promotion¶
Open the conf/lambda.json
file, and find the rule_promotion_config
section. Toggling the
enabled
flag to true
will allow for deployment of the Rule Promotion Lambda function.
Example:
conf/lambda.json
¶{
"rule_promotion_config": {
"enabled": true,
"log_level": "info",
"log_retention_days": 14,
"memory": 128,
"schedule_expression": "rate(10 minutes)",
"send_digest_schedule_expression": "cron(30 13 * * ? *)",
"timeout": 120
}
}
Configuration Options¶
A few additional configuration options are available to customize the function to your needs.
Key |
Description |
|
How often the Rule Promotion Lambda function should be invoked in an attempt to promote currently staged rules. This should use AWS’s Rate or Cron Expression syntax. |
|
When to invoke the Rule Promotion Lambda function to send the alert statistics digest. This should use AWS’s Rate or Cron Expression syntax. |
Note
If either of the expressions used above are in the cron syntax, keep in mind the execution time will be relative to UTC, not local time.
The initial implementation of the Rule Promotion function has a hard-coded alert threshold, or the amount of alerts a rule can safely trigger and still be be auto-promoted to send to user-defined outputs.
The current default is 0, meaning any rule that is staged and triggers any alerts in the staging period (default of 48 hours hours) will not be auto-promoted. Manual promotion is possible via the command outlined in the Rule Staging CLI Commands section.
Deployment¶
Deploying the Rule Promotion Lambda function is similar to all of StreamAlert’s Lambda functions. The following command will create all of the necessary infrastructure and deploy the Rule Promotion function code.
python manage.py deploy --functions rule_promo
Note
After the above command is run and the function is deployed, users must subscribe to the SNS topic that is created in order to receive the alert statistics digest emails.
Alert Statistics Digest¶
The alert statistics digest that is sent to the SNS topic will contain information on staging times, as well as the amount of alerts the staged rule has generated to date. If alerts have been triggered, a link to Athena query results will also be included to assist in triaging them.
Sample Digest Email¶
Alert statistics for 2 staged rule(s) [2018-07-25 13:30:25.915131 UTC]
◦ rule_001
- Staged At: 2018-07-18 20:50:04.036690 UTC
- Staged Until: 2018-07-20 20:50:04.036690 UTC
- Time Past Staging: 4d 16h 40m
- Alert Count: 20
- Alert Info: https://console.aws.amazon.com/athena/home#query/history/0e86ea19-9449-4140-caaa-594b0979ed3d
◦ rule_002
- Staged At: 2018-07-23 22:30:38.823067 UTC
- Staged Until: 2018-07-25 22:30:38.823067 UTC
- Remaining Stage Time: 0d 9h 0m
- Alert Count: 2
- Alert Info: https://console.aws.amazon.com/athena/home#query/history/365853eb-ac8f-49b5-9118-c1c6479b2fbd
Historical Search¶
StreamAlert historical search feature is backed by Amazon S3 and Athena services. By default, StreamAlert will send all alerts to S3 and those alerts will be searchable in Athena table. StreamAlert users have option to enable historical search feature for data as well.
As of StreamAlert v3.1.0, a new field, file_format
, has been added to athena_partitioner_config
in conf/lamba.json
, defaulting to null
. This field allows users to configure how the data processed
by the Classifier is stored in S3 bucket, either in parquet
or json
.
Prior to v3.1.0, all data was stored in json
. When using this format, Athena’s search performance
degrades greatly when partition sizes grow. To address this, we’ve introduce support for parquet
to provide better Athena search performance and cost saving.
Note
When upgrading to StreamAlert v3.1.0, you must set the
file_format
value to eitherparquet
orjson
, otherwise StreamAlert will raiseMisconfigurationError
exception when runningpython manage.py build
.For existing deployments, the
file_format
value can be set tojson
to retain current functionality. However, if thefile_format
is changed toparquet
, new Athena tables will need to be recreated to load theparquet
format. The existing JSON data won’t be searchable anymore unless we build a separated tables to process data in JSON format. All of the underlying data remains stored in S3 bucket, there is no data loss.For new StreamAlert deployments, it is recommended to set
file_format
toparquet
to take advantage of better Athena search performance and cost savings when scanning data.In an upcoming release, the value for
file_format
will be set toparquet
by default, so let’s change now!
Architecture¶

The pipeline is:
StreamAlert creates an Athena Database, alerts kinesis Firehose and
alerts
table during initial deploymentOptionally create Firehose resources and Athena tables for historical data retention
S3 events will be sent to an SQS that is mapped to the Athena Partitioner Lambda function
The Lambda function adds new partitions when there are new alerts or data saved in S3 bucket via Firehose
Alerts, and optionally data, are available for searching via Athena console or the Athena API
Alerts Search¶
Review the settings for the Alerts Firehose Configuration and the Athena Partitioner function. Note that the Athena database and alerts table are created automatically when you first deploy StreamAlert.
If the
file_format
value within the Athena Partitioner function config is set toparquet
, you can run theMSCK REPAIR TABLE alerts
command in Athena to load all available partitions and then alerts can be searchable. Note, however, that theMSCK REPAIR
command cannot load new partitions automatically.StreamAlert includes a Lambda function to automatically add new partitions for Athena tables when the data arrives in S3. See Configure Lambda Settings
{ "athena_partitioner_config": { "concurrency_limit": 10, "file_format": "parquet", "log_level": "info" } }
Deploy the Athena Partitioner Lambda function
python manage.py deploy --functions athena
Search alerts in Athena Console
Choose your
Database
from the dropdown on the left. Database name is<prefix>_streamalert
Write SQL query statement in the
Query Editor
on the right
Data Search¶
It is optional to store data in S3 bucket and available for search in Athena tables.
Enable Firehose in
conf/global.json
see Firehose (Historical Data Retention)Build the Firehose and Athena tables
python manage.py build
Deploy classifier so classifier will know to send data to S3 bucket via Firehose
python manage.py deploy --functions classifier
Search data Athena Console
Choose your
Database
from the dropdown on the left. Database name is<prefix>_streamalert
Write SQL query statement in the
Query Editor
on the right
Configure Lambda Settings¶
Open conf/lambda.json
, and fill in the following options:
Key |
Required |
Default |
Description |
|
Yes |
|
Enables/Disables the Athena Partitioner Lambda function |
|
No |
|
Enables/Disables logging of metrics for the Athena Partitioner Lambda function |
|
No |
|
The log level for the Lambda function, can be either |
|
No |
|
The amount of memory (in MB) allocated to the Lambda function |
|
No |
|
The maximum duration of the Lambda function (in seconds) |
|
Yes |
|
The alerts and data format stored in S3 bucket via Firehose, can be either |
|
No |
|
Key value pairs of S3 buckets and associated Athena table names. By default, the alerts bucket will exist in each deployment. |
Example:
{
"athena_partitioner_config": {
"log_level": "info",
"memory": 128,
"buckets": {
"alternative_bucket": "data"
},
"file_format": "parquet",
"timeout": 60
}
}
Athena References¶
Tip
Alerts and data are partitioned by
dt
in the formatYYYY-MM-DD-hh
To improve query performance, filter data within a specific partition or range of partitions
SELECT * FROM "<prefix>_streamalert"."alerts" WHERE dt BETWEEN 2020-02-28-00 AND 2020-02-29-00
Scheduled Queries¶
Overview¶
Originally dubbed “StreamQuery”, this system allows you to execute Athena queries on a schedule, and funnel their results back into StreamAlert for rules analysis.
Because StreamAlert is mostly stateless, scheduled queries can allow you to correlate data together and analyze them automatically. Rules that were not previously possible can be written:
Detect X failed logins within Y minutes
Detect spike in API behavior that is correlated with an increase in # of a different alert/rule
Detect elevated API % error rates from specific IP address
How do scheduled queries work?¶
This system leverages two main components: AWS Lambda and AWS Step Functions.
First, a CloudWatch scheduled event triggers the execution of a new AWS Step Function State Machine. This State Machine manages the lifecycle of Athena queries through the Lambda function. Its sole responsibility is to execute the Lambda, wait a predefined window of time, and execute the Lambda again, repeating until the Lambda reports it is finished.
The Lambda function is a simple function that starts Athena queries, caches their execution ids, checks on their execution status, and uploads results to StreamAlert via Kinesis. Instead of doing all of these steps in a blocking fashion and sleeping while it waits for Athena, it runs through all queries in a single nonblocking pass, and returns the result of its execution to the State Machine. Once all queries have completed and their results sent to StreamAlert, the Lambda returns a “done” flag to the State Machine, signalling that this job has been finished.
Configuration¶
Scheduled Queries is configured via a single file, conf/scheduled_queries.json
.
{
"enabled": true,
"config": {
"destination_kinesis": "prefix_prod_streamalert",
"sfn_timeout_secs": 3600,
"sfn_wait_secs": 30
},
"packs": {
"hourly": {
"description": "Runs all hourly queries",
"schedule_expression": "rate(1 hour)"
},
"two_hour": {
"description": "Runs all queries run every two hours",
"schedule_expression": "rate(2 hours)"
},
"daily": {
"description": "Runs all daily queries",
"schedule_expression": "rate(24 hours)"
},
"two_day": {
"description": "Runs all queries that are run once every 2 days",
"schedule_expression": "rate(2 days)"
}
},
"lambda_config": {}
}
enabled
— (bool) Pass true to activate ScheduledQueries. Leave false to disable.config.destination_kinesis
— (str) The name of the Kinesis stream to upload results to.config.sfn_timeout_secs
- (int) Max number of seconds for the state machine to execute.config.sfn_wait_secs
- (int) Time to wait between checks of query status.query_packs
- (dict) The keys of this dict are the names of the query packs. This section is discussed in more depth below.
Query Packs¶
Query Packs are batches of scheduled Athena queries that are executed together.
"query_packs": {
...
"hourly": {
"description": "Runs all hourly queries",
"schedule_expression": "rate(1 hour)"
},
...
}
description
- (str) A string summary of what queries belong in this pack.schedule_expression
- (str) A CloudWatch schedule expression defining how frequently to execute this query pack.
Again, the keys to the query_packs
dict are the names of the query packs. These names are used below.
Writing Queries¶
After you’ve defined a few Query Packs, it’s time to add actual scheduled queries.
All scheduled queries are located in the scheduled_queries/
directory, located in the root of the project.
from streamalert.scheduled_queries.query_packs.configuration import QueryPackConfiguration
QueryPackConfiguration(
name='NAME_OF_QUERY_PACK',
description='Hey, hey! This is a description!',
# Make sure to edit the database name properly or this query will error with some
# "insufficient privileges errors"
query="""
SELECT
*
FROM
"ATHENA_DATABASE_NAME"."cloudwatch_cloudtrail"
WHERE
dt = '{utcdatehour_minus1hour}'
AND eventsource = 'athena.amazonaws.com'
AND eventname = 'StartQueryExecution'
""",
params=['utcdatehour_minus1hour'],
tags=['sample']
)
name
- (str) The name of this query. This name is published in the final result, and is useful when writing rules.description
- (str) Description of this query. This is published in the final result.query
- (str) A template SQL statement sent to Athena, with query parameters identified{like_this}
.params
- (list[str]|dict[str,callable]) Read on below…tags
- (list[str]) Tags required by this query to be run. The simplest way to use this is to put the Query pack name into this array.
params¶
The “params” option specifies how to calculate special query parameters. It supports two formats.
The first format is a list of strings from a predefined set of strings. These have special values that are calculated at runtime, and are interpolated into the template SQL string. Here is a list of the supported strings:
The second format is a dictionary mapping parameter names to functions, like so:
def func1(date):
return date.timestamp()
def func2(date):
return LookupTables.get('aaaa', 'bbbb')
QueryPackConfiguration(
...
query="""
SELECT *
FROM stuff
WHERE
dt = '{my_param_1}'
AND p2 = '{my_param_2}'
""",
params={
'my_param_1': func1,
'my_param_2': func2,
}
)
Writing Rules for StreamQuery¶
Classifier Schema¶
We provide an out-of-box sample schema for scheduled query v1.0.0 results. It is located at conf/schemas/streamquery.json
.
What does a scheduled query result look like?¶
Below is an example of what StreamAlert may receive as a result from a scheduled query.
{
"streamquery_schema_version": "1.0.0",
"execution": {
"name": "query_name_goes_here",
"description": "This is an example",
"query": "SELECT *\nFROM my_database.my_table\nWHERE dt = '2020-01-01-01' LIMIT 10",
"query_parameters": {
"dt": "2020-01-01-01"
},
"data_scanned_in_bytes": 4783293,
"execution_time_ms": 12374,
"tags": [ "query_pack_1" ],
"query_execution_id": "123845ac-273b-ad3b-2812-9812739789",
"console_link": "https://console.amazonaws.com/athena/somethingsomething",
},
"data": {
"headers": [
"username",
"time"
],
"rows": [
{
"username": "bob",
"time": 1,
},
{
"username": "sally",
"time": 2,
},
{
"username": "joe",
"time": 3,
},
],
"count": 3,
},
}
Because the data of each query may be different it is generally advisable to write a StreamAlert
matcher on the execution.name
value of the data, first. The rest is up to you!
Deployment¶
Deploying the various components of scheduled_queries is easy.
Building the Step Function, Lambda, and Query Packs¶
Anytime you change the configured query packs, you will need to run this to update the AWS Resources.
% ./manage.py built -t scheduled_queries
Deploying Python Code to Lambda¶
% ./manage.py deploy -f scheduled_queries
Best Practices¶
Use cron() instead of rate()¶
When defining schedule_expressions
, it’s safer to use cron(1 * * * *)
than rate(1 hour)
. The reason for
this is, if you use Terraform to build or rebuild your scheduled queries resources, you may end up recreating the
query pack. When using rate(1 hour)
, this will cause the CloudWatch event to immediately trigger, then wait
1 hour increments. With cron(1 * * * *)
, it is easier to determine exactly when a query pack will be executed. In this case:
“1st minute of every hour”.
Be mindful of how much data is being sent¶
Athena queries can return a TON of data. Remember that this data has to fit in Lambda memory or it will crash your application. Try to structure your queries with GROUP BY statements or restrict the fields they operate on.
CAREFULLY CONSIDER Firehose’ing Scheduled Query results into Athena¶
It is theoretically possible to Firehose all StreamQuery results received by StreamAlert back into S3, using scheduled queries for data transformation.
We don’t really recommend doing this. This can add significantly more data to the pipeline, and usage of CREATE TABLE AS SELECT
is likely a more cost efficient choice.
Use dt BETWEEN, not dt > Queries¶
In queries, prefer to be explicit about which partitions to scan. Use clauses like these:
dt = {datehour}
dt BETWEEN {datehour_minus1hour} AND {datehour}
Avoid things like dt > {datehour_minus1hour}
. This creates time-sensitivity in your query, and
may cause it to return different results than expected if there is a delay in Step Function execution (see below).
Neat Little Details¶
Athena Queries are Incredibly Cheap¶
At $5 per 1 Terabyte scanned, Athena is absurdly cheap. Go nuts with your scheduled queries!
Failed State Machine Executions are Retriable¶
AWS Step Functions record every single execution of each State Machine, as well as each state change. Going to the console, you can observe that the Input event of a State Machine execution is simply a JSON blob:
{
"name": "streamalert_scheduled_queries_cloudwatch_trigger",
"event_id": "12345678-53e7-b479-0601-1234567890",
"source_arn": "arn:aws:events:us-east-1:123456789012:rule/myprefix_streamalert_scheduled_queries_event_0",
"streamquery_configuration": {
"clock": "2020-02-13T22:06:20Z",
"tags": [
"hourly"
]
}
}
Notice the “clock”. This value is generated at the time the CloudWatch scheduled event is triggered. Thus, if you start a new State Machine execution using the exact same Input event (with the same clock), the results of that execution will be exactly (mostly…) the same.
This is useful for replaying failed State Machine executions that are resultant of Athena downtime, or
deployed bugs. Simply use the AWS Console, navigate to any failed executions, and click the New Execution
button, whereupon a form will be shown with a copy of the Input event already pre-populated!
You manually trigger query executions¶
Knowing the above, you can force StreamQuery to execute ad hoc queries simply by manually triggering State Machine executions, and passing in a correctly formatted Input event!
Make sure the Input event’s tags and clock are populated correctly to ensure the correct queries are executed.
Normalization¶
StreamAlert has an unannounced feature Data Normalization. In its current implementation, it extracts recognized field names from classified records, and saves them to a top-level key on the same record.
This is useful for rules, as they can be written to compare data fields against IoCs, such as IP Address, instead of writing one rule for each incoming data type. However, there are couple limitations we have identified as we use Normalization internally for a while.
Normalization 2.0 (Reboot)¶
In Normalization 2.0, we introduce a new lambda function Artifact Extractor
by leveraging Amazon Kinesis Data Firehose Data Transformation feature to extract interesting artifacts from records processed by classifiers. The artifacts will be stored in the same S3 bucket where StreamAlert Historical Search feature uses and the Artifacts will be available for searching via Athena as well.
Artifacts Inventory¶
An artifact is any field or subset of data within a record that bears meaning beyond the record itself, and is of interest in computer security. For example, a “carbonblack_version” would not be an artifact, as it is meaningless outside of the context of Carbon Black data. However, an ip_address
would be an artifact.
Artifact Extractor
Lambda function will build an artifacts inventory based on S3 and Athena services. It enables users to search for all artifacts across whole infrastructure from a single Athena table.
Configuration¶
In Normalization v1, the normalized types are based on log source (e.g. osquery
, cloudwatch
, etc) and defined in conf/normalized_types.json
file.
In Normalization v2, the normalized types will be based on log type (e.g. osquery:differential
, cloudwatch:cloudtrail
, cloudwatch:events
, etc) and defined in conf/schemas/*.json
. Please note, conf/normalized_types.json
will is deprecated.
All normalized types are arbitrary, but only lower case alphabetic characters and underscores should be used for names in order to be compatible with Athena.
Supported normalization configure syntax:
"cloudwatch:events": { "schema": { "field1": "string", "field2": "string", "field3": "string" }, "parser": "json", "configuration": { "normalization": { "normalized_key_name1": [ { "path": ["path", "to", "original", "key"], "function": "The purpose of normalized_key_name1", "condition": { "path": ["path", "to", "other", "key"], "is|is_not|in|not_in|contains|not_contains": "string or a list" }, "send_to_artifacts": true|false } ] } } }
normalized_key_name1
: An arbitrary string to name the normalized key, e.g.ip_address
,hostname
,command
etc.path
: A list contains a json path to the original key which will be normalized.function
: Describe the purpose of the normalizer.condition
: An optional block that is executed first. If the condition is not met, then this normalizer is skipped.path
: A list contains a json path to the condition key.is|is_not|in|not_in|contains|not_contains
: Exactly one of these fields must be provided. This is the value that the conditional field that is compared against. E.g"condition": { "path": ["account"], "is": "123456" } "condition": { "path": ["detail", "userIdentity", "userName"], "in": ["root", "test_username"] }
Note
Use all lowercases string a list of strings in the conditional field. The value from the record will be converted to all lowercases.
send_to_artifacts
: A boolean flag indicates should normalized information sent toartifacts
table. This field is optional and it is default totrue
. It thinks all normalized information are artifacts unless set this flag tofalse
explicitly.
Below are some example configurations for normalization v2.
Normalize all ip addresses (
ip_address
) and user identities (user_identity
) forcloudwatch:events
logsconf/schemas/cloudwatch.json
"cloudwatch:events": { "schema": { "account": "string", "detail": {}, "detail-type": "string", "id": "string", "region": "string", "resources": [], "source": "string", "time": "string", "version": "string" }, "parser": "json", "configuration": { "normalization": { "ip_address": [ { "path": [ "detail", "sourceIPAddress" ], "function": "Source IP addresses" } ], "user_identity": [ { "path": ["detail", "userIdentity", "type"], "function": "User identity type", "send_to_artifacts": false }, { "path": ["detail", "userIdentity", "arn"], "function": "User identity arn" }, { "path": ["detail", "userIdentity", "userName"], "function": "User identity username" } ] } } }
Normalize all commands (
command
) and file paths (file_path
) forosquery:differential
logsconf/schemas/osquery.json
"osquery:differential": { "schema": { "action": "string", "calendarTime": "string", "columns": {}, "counter": "integer", "decorations": {}, "epoch": "integer", "hostIdentifier": "string", "log_type": "string", "name": "string", "unixTime": "integer", "logNumericsAsNumbers": "string", "numerics": "string" }, "parser": "json", "configuration": { "optional_top_level_keys": [ "counter", "decorations", "epoch", "log_type", "logNumericsAsNumbers", "numerics" ], "normalization": { "command": [ { "path": ["columns", "command"], "function": "Command line from shell history" } ], "file_path": [ { "path": ["columns", "history_file"], "function": "Shell history file path" } ] } } }
Normalize username (
user_identity
) forcloudwatch:events
logs when certain condition is met. In the following example, it will only normalize username related to AWS accounts11111111
and22222222
.conf/schemas/cloudwatch.json
"cloudwatch:events": { "schema": { "account": "string", "detail": {}, "detail-type": "string", "id": "string", "region": "string", "resources": [], "source": "string", "time": "string", "version": "string" }, "parser": "json", "configuration": { "normalization": { "user_identity": [ { "path": ["detail", "userIdentity", "userName"], "function": "User identity username", "condition": { "path": ["account"], "in": ["11111111", "22222222"] } } ] } } }
Deployment¶
Artifact Extractor will only work if Firehose and Historical Search are enabled in
conf/global.json
"infrastructure": { ... "firehose": { "use_prefix": true, "buffer_interval": 60, "buffer_size": 128, "enabled": true, "enabled_logs": { "cloudwatch": {}, "osquery": {} } } ... }
Enable Artifact Extractor feature in
conf/global.json
"infrastructure": { "artifact_extractor": { "enabled": true, "firehose_buffer_size": 128, "firehose_buffer_interval": 900 }, "firehose": { "use_prefix": true, "buffer_interval": 60, "buffer_size": 128, "enabled": true, "enabled_logs": { "cloudwatch": {}, "osquery": {} } } ... }
Artifact Extractor feature will add few more resources by running
build
CLIIt will add following resources.
A new Glue catalog table
artifacts
for Historical Search via AthenaA new Firehose to deliver artifacts to S3 bucket
New permissions
python manage.py build --target artifact_extractor
Then we can deploy
classifier
to enable Artifact Extractor feature.python manage.py deploy --function classifier
Note
If the normalization configuration has changed in
conf/schemas/*.json
, make sure to deploy the classifier Lambda function to take effect.
Custom Metrics¶
Add additional three custom metrics to Classifier for artifacts statistics.
ExtractedArtifacts
: Log the number of artifacts extracted from the recordsFirehoseFailedArtifats
: Log the number of records (artifacts) failed sent to FirehoseFirehoseArtifactsSent
: Log the number of records (artifacts) sent to Firehose
By default, the custom metrics should be enabled in the Classifier, for example in conf/clusters/prod.json
{ "id": "prod", "classifier_config": { "enable_custom_metrics": true, ... } }python manage.py build --target "metric_filters_*"
Artifacts¶
Artifacts will be searchable within the Athena
artifacts
table while original logs are still searchable within dedicated table.
Search
cloudwatch:events
logs:SELECT * FROM PREFIX_streamalert.cloudwatch_events WHERE dt='2020-06-22-23'![]()
(click to enlarge)¶
All artifacts, including artifacts extracted from
cloudwatch:events
, will live inartifacts
table.
SELECT * FROM PREFIX_streamalert.artifacts WHERE dt='2020-06-22-23'![]()
(click to enlarge)¶
(Advanced) Use join search to find original record associated to the artifacts by
streamalert_record_id
SELECT artifacts.*, cloudwatch.* FROM (SELECT streamalert_record_id AS record_id, type, value FROM PREFIX_streamalert.artifacts WHERE dt ='2020-06-22-23' AND type='user_identity' AND LOWER(value)='root' LIMIT 10) AS artifacts LEFT JOIN (SELECT streamalert_normalization['streamalert_record_id'] AS record_id, detail FROM PREFIX_streamalert.cloudwatch_events WHERE dt ='2020-06-22-23' LIMIT 10) AS cloudwatch ON artifacts.record_id = cloudwatch.record_id![]()
(click to enlarge)¶
Note
Instead issue two searches, we can use JOIN statement to search once across two tables to find the original record(s) associated with the interesting artifacts. This requires
streamalert_normalization
field where containsstreamalert_record_id
searchable in the original table. Current process is addstreamalert_normalization
field as a top level optional key to the schema.
Update schema
conf/schemas/cloudwatch.json
"cloudwatch:events": { "schema": { "account": "string", "detail": {}, "detail-type": "string", "id": "string", "region": "string", "resources": [], "source": "string", "streamalert_normalization": {}, "time": "string", "version": "string" }, "parser": "json", "configuration": { "optional_top_level_keys": [ "streamalert_normalization" ], "normalization": { "user_identity": [ { "path": ["detail", "userIdentity", "type"], "function": "User identity type" }, { "path": ["detail", "userIdentity", "arn"], "function": "User identity arn" }, { "path": ["detail", "userIdentity", "userName"], "function": "User identity username" } ] } } }Apply the change by running
python manage.py build --target "kinesis_firehose_*"
Considerations¶
The Normalization Reboot will bring us good value in terms of how easy will be to search for artifacts across entire infrastructure in the organization. It will also make it possible to write more efficient scheduled queries to have correlated alerting in place. But, it is worth to mention that there may have some tradeoffs on requiring additional resources, adding additional data delay.
Increase in Data Footprint: Each individual original record has the chance to add many artifacts. In practice, this will likely not be a huge issue as each artifact is very small and only contains few fields.
Additional Delay: Firehose data transformation will add additional up to 900 seconds of delay on the data available for historical search. 900 seconds is a configurable setting on the Firehose where the artifacts extracted from. Reduce the firehose buffer_interval value if want to reduce delay.
High memory usage: Artifact Extractor Lambda function may need at least 3x max(buffer size of firehoses where the artifacts extracted from). Because we are doing lots of data copy in Artifact Extractor lambda function. This may be improved by writing more efficient code in the Artifact Extractor Lambda function..
Example Schemas¶
For additional background on schemas, see Schemas
JSON¶
CloudWatch¶
Example Log¶
{
"version": "0",
"id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
"detail-type": "EC2 Instance State-change Notification",
"source": "aws.ec2",
"account": "111122223333",
"time": "2015-12-22T18:43:48Z",
"region": "us-east-1",
"resources": [
"arn:aws:ec2:us-east-1:123456789012:instance/i-12345678"
],
"detail": {
"instance-id": "i-12345678",
"state": "terminated"
}
}
Schema¶
{
"cloudwatch:ec2_event": {
"schema": {
"version": "string",
"id": "string",
"detail-type": "string",
"source": "string",
"account": "integer",
"time": "string",
"region": "string",
"resources": [],
"detail": {
"instance-id": "string",
"state": "string"
}
},
"parser": "json"
}
}
Inspec¶
Example Log¶
{
"version": "1.4.1",
"profiles": [
{
"supports": [],
"controls": [
{
"title": null,
"desc": null,
"impact": 0.5,
"refs": [],
"tags": {},
"code": "code-snip",
"source_location": {
"ref": "/lib/inspec/control_eval_context.rb",
"line": 87
},
"id": "(generated from osquery.rb:6 de0aa7d2405c27dfaf34a56e2aa67842)",
"results": [
{
"status": "passed",
"code_desc": "File /var/osquery/osquery.conf should be file",
"run_time": 0.001332,
"start_time": "2017-01-01 00:00:00 -0700"
}
]
}
],
"groups": [
{
"title": null,
"controls": [
"(generated from osquery.rb:1 813971f93b6f1a66e85f6541d49bbba5)",
"(generated from osquery.rb:6 de0aa7d2405c27dfaf34a56e2aa67842)"
],
"id": "osquery.rb"
}
],
"attributes": []
}
],
"other_checks": [],
"statistics": {
"duration": 0.041876
}
}
Schema¶
{
"inspec": {
"schema": {
"title": "string",
"desc": "string",
"impact": "float",
"refs": [],
"tags": {},
"code": "string",
"id": "string",
"source_location": {
"ref": "string",
"line": "integer"
},
"results": []
},
"parser": "json",
"configuration": {
"json_path": "profiles[*].controls[*]"
}
}
}
Box.com¶
Example Log¶
{
"source": {
"item_type": "file",
"item_id": "111111111111",
"item_name": "my-file.pdf",
"parent": {
"type": "folder",
"name": "Files",
"id": "22222222222"
}
},
"created_by": {
"type": "user",
"id": "111111111",
"name": "User Name",
"login": "user.name@domain.com"
},
"created_at": "2017-01-01T00:00:00-07:00",
"event_id": "111ccc11-7777-4444-aaaa-dddddddddddddd",
"event_type": "EDIT",
"ip_address": "127.0.0.1",
"type": "event",
"session_id": null,
"additional_details": {
"shared_link_id": "sadfjaksfd981348fkdqwjwelasd9f8",
"size": 14212335,
"ekm_id": "111ccc11-7777-4444-aaaa-dddddddddd",
"version_id": "111111111111",
"service_id": "5555",
"service_name": "Box Sync for Mac"
}
}
Schema¶
{
"box": {
"schema": {
"source": {
"item_type": "string",
"item_id": "integer",
"item_name": "string",
"parent": {
"type": "string",
"name": "string",
"id": "integer"
}
},
"created_by": {
"type": "string",
"id": "integer",
"name": "string",
"login": "string"
},
"created_at": "string",
"event_id": "string",
"event_type": "string",
"ip_address": "string",
"type": "string",
"session_id": "string",
"additional_details": {}
},
"parser": "json"
}
}
CloudWatch VPC Flow Logs¶
AWS VPC Flow Logs can be delivered to StreamAlert via CloudWatch.
CloudWatch logs are delivered as a nested record, so we will need to pass configuration
options to the parser to find the nested records:
Schema¶
{
"cloudwatch:flow_logs": {
"schema": {
"protocol": "integer",
"source": "string",
"destination": "string",
"srcport": "integer",
"destport": "integer",
"action": "string",
"packets": "integer",
"bytes": "integer",
"windowstart": "integer",
"windowend": "integer",
"version": "integer",
"eni": "string",
"account": "integer",
"flowlogstatus": "string"
},
"parser": "json",
"configuration": {
"json_path": "logEvents[*].extractedFields",
"envelope_keys": {
"logGroup": "string",
"logStream": "string",
"owner": "integer"
}
}
}
}
osquery¶
Osquery’s schema changes depending on the SELECT
statement used and the table queried. There are several options when writing schemas for these logs.
Schema, Option #1¶
Define a schema for each table used:
{
"osquery:etc_hosts": {
"parser": "json",
"schema": {
"name": "string",
"columns": {
"address": "string",
"hostnames": "string"
},
"action": "string",
"field...": "type..."
}
},
"osquery:listening_ports": {
"parser": "json",
"schema": {
"name": "string",
"columns": {
"pid": "integer",
"port": "integer",
"protocol": "integer",
"field...": "type..."
},
"action": "string",
"field...": "type..."
}
}
}
This approach promotes Rule safety, but requires additional time to define the schemas.
Schema, Option #2¶
Define a “loose” schema which captures arbitrary values for a given field:
{
"osquery:differential": {
"parser": "json",
"schema": {
"name": "string",
"hostIdentifier": "string",
"calendarTime": "string",
"unixTime": "integer",
"columns": {},
"action": "string"
}
}
}
Note
The value for columns
above of {}
indicates that a map with any key/value pairs is acceptable.
Warning
In Option 2, the schema definition is flexible, but Rule safety is lost because you will need to use defensive programming when accessing and analyzing fields in columns. The use of req_subkeys will be advised in this case, see Rules for additional details.
CSV¶
See CSV Parsing
Key-Value (KV)¶
auditd¶
Example Log¶
type=SYSCALL msg=audit(1364481363.243:24287): arch=c000003e syscall=2 success=no exit=-13
a0=7fffd19c5592 a1=0 a2=7fffd19c4b50 a3=a items=1 ppid=2686 pid=3538 auid=500 uid=500
gid=500 euid=500 suid=500 fsuid=500 egid=500 sgid=500 fsgid=500 tty=pts0 ses=1 comm="cat"
exe="/bin/cat" subj=unconfined_u:unconfined_r:unconfined_t:s0-s0:c0.c1023 key="sshd_config"
type=CWD msg=audit(1364481363.243:24287): cwd="/home/shadowman" type=PATH msg=audit(1364481363.243:24287):
item=0 name="/etc/ssh/sshd_config" inode=409248 dev=fd:00 mode=0100600 ouid=0 ogid=0
rdev=00:00 obj=system_u:object_r:etc_t:s0
Schema¶
{
"example_auditd": {
"parser": "kv",
"schema": {
"type": "string",
"msg": "string",
"arch": "string",
"syscall": "string",
"success": "string",
"exit": "string",
"a0": "string",
"a1": "string",
"a2": "string",
"a3": "string",
"items": "string",
"ppid": "integer",
"pid": "integer",
"auid": "integer",
"uid": "integer",
"gid": "integer",
"euid": "integer",
"suid": "integer",
"fsuid": "integer",
"egid": "integer",
"sgid": "integer",
"fsgid": "integer",
"tty": "string",
"ses": "string",
"comm": "string",
"exe": "string",
"subj": "string",
"key": "string",
"type_2": "string",
"msg_2": "string",
"cwd": "string",
"type_3": "string",
"msg_3": "string",
"item": "string",
"name": "string",
"inode": "string",
"dev": "string",
"mode": "integer",
"ouid": "integer",
"ogid": "integer",
"rdev": "string",
"obj": "string"
},
"configuration": {
"delimiter": " ",
"separator": "="
}
}
}
Note
The value for parser
above should be set to kv
for key-value parsing. The delimiter
and
separator
keys within configuration
indicate the values to use for delimiter and field
separator, respectively.
Syslog¶
See Syslog Parsing
Troubleshooting¶
Kinesis Data Streams¶
As detailed in other sections, StreamAlert utilizes Amazon Kinesis Data Streams.
Review Kinesis Streams key concepts
ThroughputExceeded¶
Pertains to WriteProvisionedThroughputExceeded
or ProvisionedThroughputExceededException
The documentation above states:
“Each shard can support up to 1,000 records per second for writes, up to a maximum total data write rate of 1 MB per second (including partition keys)”
“Each PutRecords request can support up to 500 records. Each record in the request can be as large as 1 MB, up to a limit of 5 MB for the entire request, including partition keys”
If you’re experiencing either error, one of the following holds true:
You are exceeding 1000 records/s write on at least one shard
You are exceeding 1MB/s on at least one shard
You sent > 500 records in a single PutRecords request
You sent a record > 1MB
You are sending > 5MB in a PutRecords request to at least one shard
(1),(2),(5) can be addressed by allocating more shards or using a partition key
(3) and (4) can be addressed by using code or agents with the proper limit checks
How you setup your partition keys depends on your use-cases, scale and how you’re sending your data.
In our experience, there are three common use-cases:
No partition key (small scale)
Per-batch partition key (medium scale)
Per-record partition key (larger scale)
Explanation: A PutRecordsBatch request can have up to 500 records amounting to a total of 5MB. If you’re doing a per-batch partition key, that means you’re attempting to send up to 5MB to a single shard that has a limit of 1MB/s. Keep in mind: if your code/agent uses splay or has reasonable retry logic, an error or exception does not imply data loss and may still be a viable strategy.
StreamAlert enables AWS Enhanced Monitoring to help you diagnose these types of issues via shard-level metrics. Simply go to CloudWatch
-> Metrics
-> Kinesis
. This also allows you to measure IncomingBytes and IncomingRecords.
DescribeStream: Rate exceeded¶
Or DescribeDeliveryStream: Rate exceeded.
This API call is limited to 10 requests/s. Your agent/code should not be using this API call to determine if the Kinesis Stream is available to receive data. The agent/code should simply attempt to send the data and gracefully handle any exceptions.
certificate verify failed¶
Run the following command on the impacted host, choosing the correct region: openssl s_client -showcerts -connect kinesis.us-west-2.amazonaws.com:443
If this returns Verify return code: 0 (ok)
, your agent/code needs to use Amazon’s root and/or intermediate certificates (PEM) for TLS to function properly
FAQ¶
Frequently Asked Questions
What language is StreamAlert written in?
The application and rules are written in Python, the infrastructure is written with Terraform
Code can be found here: https://github.com/airbnb/streamalert
What license is StreamAlert released under?
How much does StreamAlert cost
StreamAlert is open source (free)
What/How can I send data to StreamAlert?
See Datasources
What scale does StreamAlert operate at?
StreamAlert utilizes Kinesis Streams, which can “continuously capture and store terabytes of data per hour from hundreds of thousands of sources” [1]
What’s the maintenance/operational overhead?
Limited; StreamAlert utilizes Terraform, Kinesis Streams and AWS Lambda, which means you don’t have to manually provision, manage, patch or harden any servers
Does StreamAlert support analytics, metrics or time series use-cases?
StreamAlert itself does not support analytics, metrics or time series use-cases. StreamAlert can send data to such tools or you can use one of many great open source and commercial offerings in this space, including but not limited to Prometheus, DataDog and NewRelic.
Is StreamAlert intended for synchronous (blocking) or asynchronous decision making?
StreamAlert is intended for asynchronous decision making.
Contact Us¶
Don’t see your question answered here?
Feel free to open an issue, submit a PR, and/or reach out to us on Slack
Alternatives¶
It should be noted that the correct choice depends on your use-cases, existing infrastructure, security requirements, available resources, core competencies and more. Details outlined below were considered notable differences and shouldn’t constitute a complete, detailed comparison.
ElastAlert¶
Infrastructure¶
ElastAlert assumes you have an existing Elasticsearch cluster; it schedules queries against it
StreamAlert directly ingests data from S3 buckets or other sources like Fluentd, Logstash, Kinesis Agent, osquery, PHP, Java, Ruby, etc via Amazon Kinesis Data Streams
Rules/Queries¶
ElastAlert uses YAML files and Elasticsearch’s Query DSL. It supports query types that StreamAlert currently does not, ex: Change, Frequency, Spike, Flatline, New Term, Cardinality
StreamAlert uses JSON files and queries are written in Python; they can utilize any Python libraries or functions
Security¶
In ElastAlert, TLS and authentication is optional (Elasticsearch). This can be enabled via Elastic Shield/X-Pack.
StreamAlert requires TLS for data transport (Kinesis requirement) and authentication is required (AWS Identity and Access Management (IAM))
Etsy’s 411¶
Infrastructure¶
411 assumes you have an existing Elasticsearch cluster; it schedules queries against it
StreamAlert directly ingests data from S3 buckets or other sources like fluentd, logstash, kinesis-agent, osquery, PHP, Java, Ruby, etc via Amazon Kinesis Data Streams
Rules/Queries¶
411 uses a custom query language called ESQuery, “Pipelined Lucene shorthand”, which is then translated to Elasticsearch’s Query DSL
StreamAlert rules/queries are written in Python; they can utilize any Python libraries or functions.
Security¶
411:
Infrastructure: Apache (w/mod_rewrite, mod_headers), PHP, SQLite, & MySQL. You are responsible for hardening and vulnerability management of these applications and the underlying host / operating system.
AuthN/AuthZ: The UI is accessed via username/password over TLS. TLS and authentication is optional for Elasticsearch; it can be enabled via Elastic Shield/X-Pack
StreamAlert:
Infrastructure: Serverless; underlying operating system is hardened and updated by Amazon. Application is Python and runs in a short-lived container/sandbox.
Requires TLS for data transport (Kinesis requirement)
AuthN/AuthZ is required (AWS Identity and Access Management (IAM))