by John Lukach
Initially, the project started as Python cloud configuration, data collection, and threat detection scripts lacking analysis capabilities.
Who needs another Cloud Security Posture Management (CSPM) in a crowded market with all the open source and vendor offerings?
Foolishly, I thought I did back in October 2022, as instead of waiting for updated parsers, the collection used the botocore data structures.
https://github.com/boto/botocore/tree/develop/botocore/data
It worked well to keep up with the AWS re:Invent releases until I ran out of steam in January 2023. In July 2023, I still required the capabilities, so I took the project in a new direction to make it more sustainable.
Enter Jupyter Notebook that runs anywhere from Amazon SageMaker, GitHub Codespaces, or wherever I have the most system resources available to provide the user interface to focus on threat hunting.
I changed the Python library to simplify authentication, collaboration, configuration, dependencies, and Amazon Security Lake data searching using Jupyter Notebook for analysis.
https://github.com/botoplus/botoplus
I will walk through the botoplus notebook in the GitHub repository. If you don’t have permission to access all the calls, the required information can be hard-coded into the notebook.
Authentication from your Jupyter Notebook will require the installation of AWS Command Line Interface (AWS CLI) Version 2 that supports AWS IAM Identity Center, previously called Single Sign-On (SSO).
import os
os.system('curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "../awscliv2.zip"')
os.system('unzip -o ../awscliv2.zip -d ../')
os.system('cd .. && sudo ./aws/install --update')
os.system('aws --version')
I will also install the aqueduct-utility for the botoplus Python library, which will help simplify the device authentication.
https://github.com/jblukach/aqueduct
import sys
!{sys.executable} -m pip install botoplus --upgrade
!{sys.executable} -m pip install aqueduct-utility --upgrade
I can log in to my organization by providing the Identity Store, SSO Region, SSO Role, CLI Region, and CLI Output.
import aqueduct.identity as _idp
_idp.login()
Amazon Security Lake requires the service to have administration delegated to another account. I can find the installation location by querying the organization’s management account or another delegated administration account.
import botoplus.botoplus as botoplus
selected_account = 'Unavailable'
session = botoplus.default()
client = session.client('organizations')
paginator = client.get_paginator('list_delegated_administrators')
pages = paginator.paginate()
for page in pages:
for item in page['DelegatedAdministrators']:
paginator2 = client.get_paginator('list_delegated_services_for_account')
pages2 = paginator2.paginate(
AccountId = item['Id']
)
for page2 in pages2:
for item2 in page2['DelegatedServices']:
if item2['ServicePrincipal'] == 'securitylake.amazonaws.com':
selected_account = item['Name']
print('Amazon Security Lake Delegated Administrator: '+selected_account)
I need to check what and where data ingestion occurs for the environment.
import botoplus.botoplus as botoplus
ingestion = []
try:
session = botoplus.defaults(selected_account)
securitylake = session.client('securitylake')
regions = securitylake.get_data_lake_organization_configuration()
for region in regions['autoEnableNewAccount']:
print(region['region'])
ingestion.append(region['region'])
for source in region['sources']:
print(' * '+source['sourceVersion']+' '+source['sourceName'])
except:
print('Amazon Security Lake Delegated Administrator: Unidentified')
pass
I also need to see the configuration for data replication to a central region.
import botoplus.botoplus as botoplus
replication = []
try:
session = botoplus.defaults(selected_account)
securitylake = session.client('securitylake')
lakes = securitylake.list_data_lakes(
regions = ingestion
)
for lake in lakes['dataLakes']:
try:
for region in lake['replicationConfiguration']['regions']:
replication.append(region)
except:
pass
except:
print('Amazon Security Ingestion Region(s): Unidentified')
pass
try:
replication = list(set(replication))
replication = replication[0]
print('Amazon Security Lake Replication Region: '+str(replication))
except:
pass
It is also good to verify the configured data retention.
import json
import botoplus.botoplus as botoplus
try:
session = botoplus.defaults(selected_account)
securitylake = session.client('securitylake')
lakes = securitylake.list_data_lakes(
regions = ingestion
)
for lake in lakes['dataLakes']:
if lake['region'] == replication:
print(json.dumps(lake['lifecycleConfiguration'], indent=4, sort_keys=True))
except:
print('Amazon Security Ingestion Region(s): Unidentified')
pass
We are all set to run our first search against the Amazon Route53 resolver query logs.
import botoplus.botoplus as botoplus
query = """
SELECT time,
metadata.product.version,
metadata.product.name AS service,
metadata.product.feature.name,
metadata.product.vendor_name,
metadata.profiles,
metadata.version AS securitylake,
src_endpoint.vpc_uid AS src_vpc_uid,
src_endpoint.ip AS src_ip,
src_endpoint.port AS src_port,
src_endpoint.instance_uid AS src_instance_uid,
query.hostname AS query_hostname,
query.type AS query_type,
query.class AS query_class,
connection_info.protocol_name,
connection_info.direction,
connection_info.direction_id,
dst_endpoint.instance_uid AS dst_instance_uid,
dst_endpoint.interface_uid AS dst_inaterface_uid,
severity_id,
severity,
class_name,
class_uid,
disposition,
disposition_id,
rcode_id,
rcode,
activity_id,
activity_name,
type_name,
type_uid,
unmapped,
region,
accountid,
eventday,
answers
FROM amazon_security_lake_glue_db_us_east_2.amazon_security_lake_table_us_east_2_route53_1_0
WHERE eventDay BETWEEN cast(
date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar
)
and cast(
date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar
)
ORDER BY time DESC
"""
converted = botoplus.convert(selected_account)
session = botoplus.defaults(selected_account)
athena = session.client('athena', region_name=replication)
athena.start_query_execution(
QueryString = query,
ResultConfiguration = {
'OutputLocation': 's3://temp-athena-results-'+converted['awsaccount']+'-'+replication+'/'
}
)
Better check if the search was successful and how big the data set returned.
import botoplus.botoplus as botoplus
query_execution_id = botoplus.execution()
try:
session = botoplus.defaults(selected_account)
athena = session.client('athena', region_name=replication)
output = athena.get_query_runtime_statistics(
QueryExecutionId = query_execution_id
)
print(' * Megabytes: '+str(output['QueryRuntimeStatistics']['OutputStage']['OutputBytes']/1000000))
print(' * Output Rows: '+str(output['QueryRuntimeStatistics']['OutputStage']['OutputRows']))
except:
print('Amazon Security Ingestion Region(s): Unidentified')
pass
Now, I can pull the data set over to my Jupyter Notebook for analysis.
import botoplus.botoplus as botoplus
import pandas as pd
session = botoplus.defaults(selected_account)
athena = session.client('athena', region_name=replication)
output = athena.get_query_execution(
QueryExecutionId = query_execution_id
)
bucket = output['QueryExecution']['ResultConfiguration']['OutputLocation']
out = bucket[5:].split('/')
print(bucket)
s3 = session.resource('s3')
s3.Object(out[0], out[1]).download_file('/tmp/'+out[1])
df = pd.read_csv('/tmp/'+out[1], sep=',')
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
df.head(1)