0 / 12 steps complete
Advanced AI-Assisted Cloud Security

L09: Cloud Security — AWS CloudTrail Investigation

Investigate a simulated AWS breach using CloudTrail logs. Detect privilege escalation, data exfiltration, IAM abuse, and unauthorized resource creation. Use Python to parse logs at scale, build detection queries, and generate incident reports aligned with the AWS Shared Responsibility Model.

AWS CloudTrail Python 3 pandas AWS CLI jq Kali Linux
Phase 1: CloudTrail Setup & Log Acquisition
1 Install AWS CLI and configure lab environment

On Kali Linux, install the AWS CLI and Python dependencies:

sudo apt update && sudo apt install -y awscli jq python3-pip
pip3 install boto3 pandas matplotlib

mkdir ~/cloud-lab && cd ~/cloud-lab

# Configure AWS CLI with read-only credentials for the lab
# (Use a dedicated lab AWS account or provided credentials)
aws configure
# AWS Access Key ID: [your lab key]
# AWS Secret Access Key: [your lab secret]
# Default region: us-east-1
# Output format: json

# Verify access
aws sts get-caller-identity
2 Generate or download simulated CloudTrail breach logs

Generate a realistic CloudTrail log dataset containing a multi-stage AWS breach:

cat > ~/cloud-lab/generate_cloudtrail.py << 'EOF'
import json, random, datetime, uuid

def make_event(time, user, action, resource, source_ip, region='us-east-1', error=None):
    service, api = action.split(':')
    event = {
        "eventVersion": "1.09",
        "userIdentity": {
            "type": "IAMUser",
            "principalId": f"AIDA{uuid.uuid4().hex[:16].upper()}",
            "arn": f"arn:aws:iam::123456789012:user/{user}",
            "accountId": "123456789012",
            "userName": user
        },
        "eventTime": time.strftime("%Y-%m-%dT%H:%M:%SZ"),
        "eventSource": f"{service}.amazonaws.com",
        "eventName": api,
        "awsRegion": region,
        "sourceIPAddress": source_ip,
        "userAgent": "aws-cli/2.15.0 Python/3.12.0",
        "requestID": str(uuid.uuid4()),
        "eventID": str(uuid.uuid4()),
        "readOnly": api.startswith(('Get','List','Describe')),
        "resources": [{"type": "AWS::IAM::User", "ARN": resource}],
    }
    if error:
        event["errorCode"] = error[0]
        event["errorMessage"] = error[1]
    return event

events = []
base = datetime.datetime(2024, 3, 1, 8, 0, 0)
attacker_ip = "185.234.219.44"  # External IP
legitimate_ip = "10.0.1.50"

# Phase 1: Reconnaissance (legitimate-looking)
for i in range(20):
    t = base + datetime.timedelta(minutes=i*2)
    events.append(make_event(t, "dev-alice", "iam:ListUsers",
        "arn:aws:iam::123456789012:user/*", legitimate_ip))
    events.append(make_event(t+datetime.timedelta(seconds=30),
        "dev-alice", "iam:ListRoles",
        "arn:aws:iam::123456789012:role/*", legitimate_ip))

# Phase 2: Compromised credential — attacker from external IP
compromise_time = base + datetime.timedelta(hours=2)
# Attacker enumerates
for api in ["iam:ListUsers","iam:ListPolicies","iam:GetAccountSummary",
            "ec2:DescribeInstances","s3:ListBuckets","iam:ListAttachedUserPolicies"]:
    events.append(make_event(compromise_time, "dev-alice", api,
        "arn:aws:iam::123456789012:user/dev-alice", attacker_ip))
    compromise_time += datetime.timedelta(seconds=15)

# Phase 3: Privilege escalation
for api, resource in [
    ("iam:AttachUserPolicy", "arn:aws:iam::aws:policy/AdministratorAccess"),
    ("iam:CreateAccessKey", "arn:aws:iam::123456789012:user/dev-alice"),
    ("iam:PutUserPolicy", "arn:aws:iam::123456789012:user/dev-alice"),
]:
    events.append(make_event(compromise_time, "dev-alice", api, resource, attacker_ip))
    compromise_time += datetime.timedelta(seconds=30)

# Phase 4: Data exfiltration
for i in range(15):
    events.append(make_event(compromise_time, "dev-alice", "s3:GetObject",
        f"arn:aws:s3:::company-sensitive-data/confidential/file{i}.csv", attacker_ip))
    compromise_time += datetime.timedelta(seconds=10)

# Phase 5: Persistence — new admin user
for api, resource in [
    ("iam:CreateUser", "arn:aws:iam::123456789012:user/backup-svc"),
    ("iam:AttachUserPolicy", "arn:aws:iam::aws:policy/AdministratorAccess"),
    ("iam:CreateAccessKey", "arn:aws:iam::123456789012:user/backup-svc"),
    ("ec2:RunInstances", "arn:aws:ec2:us-east-1:123456789012:instance/*"),
]:
    events.append(make_event(compromise_time, "dev-alice", api, resource, attacker_ip))
    compromise_time += datetime.timedelta(seconds=45)

# Add some noise (normal activity from legit users)
for i in range(50):
    t = base + datetime.timedelta(minutes=random.randint(0, 480))
    user = random.choice(["dev-bob", "dev-charlie", "svc-deploy"])
    events.append(make_event(t, user,
        random.choice(["ec2:DescribeInstances","s3:ListBuckets","iam:GetUser"]),
        f"arn:aws:iam::123456789012:user/{user}", legitimate_ip))

events.sort(key=lambda x: x['eventTime'])
with open('cloudtrail_breach.json', 'w') as f:
    json.dump({"Records": events}, f, indent=2)
print(f"Generated {len(events)} CloudTrail events")
EOF
cd ~/cloud-lab && python3 generate_cloudtrail.py
Phase 2: Reconnaissance & Privilege Escalation Detection
3 Parse CloudTrail logs and extract key fields
cat > ~/cloud-lab/parse_cloudtrail.py << 'EOF' import json, pandas as pd with open('cloudtrail_breach.json') as f: data = json.load(f) records = [] for event in data['Records']: records.append({ 'time': event['eventTime'], 'user': event['userIdentity'].get('userName', event['userIdentity'].get('type','')), 'event_name': event['eventName'], 'event_source': event['eventSource'], 'source_ip': event['sourceIPAddress'], 'region': event['awsRegion'], 'read_only': event.get('readOnly', False), 'error_code': event.get('errorCode', ''), 'resource': event['resources'][0]['ARN'] if event.get('resources') else '', }) df = pd.DataFrame(records) df['time'] = pd.to_datetime(df['time']) df['hour'] = df['time'].dt.hour print(f"Total events: {len(df)}") print(f"\nEvents by user:\n{df['user'].value_counts()}") print(f"\nEvents by source IP:\n{df['source_ip'].value_counts().head()}") print(f"\nUnique API calls: {df['event_name'].nunique()}") df.to_csv('cloudtrail_parsed.csv', index=False) print("\nSaved to cloudtrail_parsed.csv") EOF python3 parse_cloudtrail.py
4 Detect IAM reconnaissance and enumeration

Identify accounts performing excessive IAM read operations — a common pre-escalation step:

python3 << 'EOF'
import pandas as pd

df = pd.read_csv('cloudtrail_parsed.csv', parse_dates=['time'])

# IAM enumeration APIs
iam_recon_apis = ['ListUsers','ListRoles','ListPolicies','GetAccountSummary',
                   'ListAttachedUserPolicies','ListGroupsForUser','GetUser',
                   'ListAccessKeys','GetPolicy','ListAccountAliases']

# Flag: multiple distinct IAM read APIs from same IP in short window
recon = df[df['event_name'].isin(iam_recon_apis)].copy()
recon_summary = recon.groupby(['source_ip', 'user']).agg(
    api_count=('event_name', 'count'),
    unique_apis=('event_name', 'nunique'),
    first_seen=('time', 'min'),
    last_seen=('time', 'max')
).reset_index()
recon_summary['duration_min'] = (
    recon_summary['last_seen'] - recon_summary['first_seen']
).dt.total_seconds() / 60

# Flag: many distinct APIs from external IP
flagged = recon_summary[recon_summary['unique_apis'] >= 4]
print("IAM Reconnaissance Detection:")
print(flagged.to_string(index=False))
EOF
5 Detect privilege escalation via IAM policy attachment

Flag AttachUserPolicy / PutUserPolicy calls granting AdministratorAccess:

python3 << 'EOF'
import pandas as pd

df = pd.read_csv('cloudtrail_parsed.csv', parse_dates=['time'])

# Privilege escalation indicators
privesc_apis = ['AttachUserPolicy', 'PutUserPolicy', 'AttachRolePolicy',
                'CreatePolicyVersion', 'SetDefaultPolicyVersion',
                'AddUserToGroup', 'CreateLoginProfile', 'UpdateLoginProfile']

privesc = df[df['event_name'].isin(privesc_apis)].copy()

print(f"[!] Privilege escalation API calls detected: {len(privesc)}")
for _, row in privesc.iterrows():
    print(f"\n  Time:    {row['time']}")
    print(f"  User:    {row['user']}")
    print(f"  API:     {row['event_name']}")
    print(f"  Resource:{row['resource']}")
    print(f"  Src IP:  {row['source_ip']}")

    # Check for AdministratorAccess policy attachment
    if 'AdministratorAccess' in str(row['resource']):
        print(f"  [CRITICAL] AdministratorAccess policy attached!")
        print(f"  MITRE: T1098 — Account Manipulation")
EOF
Phase 3: Data Exfiltration & Persistence Detection
6 Detect S3 data exfiltration patterns
python3 << 'EOF'
import pandas as pd
from datetime import timedelta

df = pd.read_csv('cloudtrail_parsed.csv', parse_dates=['time'])

# S3 GetObject spikes (>10 objects in 5 minutes from external IP)
s3_gets = df[df['event_name'] == 'GetObject'].copy()

results = []
for ip, group in s3_gets.groupby('source_ip'):
    group = group.sort_values('time')
    for i, row in group.iterrows():
        window = group[(group['time'] >= row['time']) &
                       (group['time'] <= row['time'] + timedelta(minutes=5))]
        if len(window) >= 10:
            results.append({
                'source_ip': ip,
                'user': row['user'],
                'start': row['time'].isoformat(),
                'objects_accessed': len(window),
                'buckets': list(window['resource'].str.extract(r'arn:aws:s3:::([^/]+)')[0].unique()),
            })
            break

if results:
    for r in results:
        print(f"\n[ALERT] S3 Mass Download Detected!")
        print(f"  Source IP:   {r['source_ip']}")
        print(f"  User:        {r['user']}")
        print(f"  Time:        {r['start']}")
        print(f"  Objects:     {r['objects_accessed']} in 5 min")
        print(f"  Buckets:     {r['buckets']}")
        print(f"  MITRE: T1530 — Data from Cloud Storage Object")
else:
    print("No S3 exfiltration detected in this window")
EOF
7 Detect persistence via rogue IAM user creation
python3 << 'EOF'
import pandas as pd

df = pd.read_csv('cloudtrail_parsed.csv', parse_dates=['time'])

# Persistence: CreateUser → AttachUserPolicy (Admin) → CreateAccessKey
# happening in rapid sequence
persistence_apis = ['CreateUser', 'CreateAccessKey', 'AttachUserPolicy']
persist = df[df['event_name'].isin(persistence_apis)].copy()

print(f"Persistence-related API calls: {len(persist)}")
print()
for _, row in persist.iterrows():
    alert_level = "CRITICAL" if row['event_name'] in ['CreateUser','CreateAccessKey'] else "HIGH"
    print(f"[{alert_level}] {row['time']} | {row['user']}@{row['source_ip']}")
    print(f"         {row['event_name']} → {row['resource']}")

# Also check for unauthorized EC2 instance launches (crypto mining, C2 hosting)
ec2_launches = df[df['event_name'] == 'RunInstances']
if len(ec2_launches) > 0:
    print(f"\n[CRITICAL] Unauthorized EC2 launch detected:")
    for _, row in ec2_launches.iterrows():
        print(f"  {row['time']} | {row['user']}@{row['source_ip']}")
        print(f"  MITRE: T1578.002 — Create Cloud Instance")
EOF
8 Reconstruct the full attack timeline
python3 << 'EOF' import pandas as pd df = pd.read_csv('cloudtrail_parsed.csv', parse_dates=['time']) ATTACKER_IP = "185.234.219.44" # Filter to attacker IP and sort chronologically attacker_events = df[df['source_ip'] == ATTACKER_IP].sort_values('time') print("=" * 70) print("ATTACK TIMELINE RECONSTRUCTION") print("=" * 70) phases = { 'Reconnaissance': ['ListUsers','ListRoles','ListPolicies','GetAccountSummary', 'DescribeInstances','ListBuckets','ListAttachedUserPolicies'], 'Privilege Escalation': ['AttachUserPolicy','PutUserPolicy','CreateAccessKey'], 'Data Exfiltration': ['GetObject'], 'Persistence': ['CreateUser','RunInstances'], } for _, row in attacker_events.iterrows(): phase = 'Other' for p, apis in phases.items(): if row['event_name'] in apis: phase = p; break print(f"[{row['time'].strftime('%H:%M:%S')}] [{phase:22}] {row['event_name']:<30} {row['user']}") print(f"\nTotal attacker events: {len(attacker_events)}") print(f"Compromised account: {attacker_events['user'].mode()[0]}") EOF
Phase 4: Detection Rules & Incident Response
9 Write AWS CloudWatch detection queries
# CloudWatch Logs Insights query — privilege escalation detection # Run these in AWS Console → CloudWatch → Log Insights → Select CloudTrail log group # Query 1: IAM privilege escalation fields eventTime, userIdentity.userName, eventName, requestParameters.policyArn | filter eventName in ["AttachUserPolicy", "PutUserPolicy", "AttachRolePolicy", "CreatePolicyVersion"] | filter requestParameters.policyArn like /AdministratorAccess/ | sort eventTime desc | limit 50 # Query 2: External IP accessing sensitive APIs fields eventTime, userIdentity.userName, eventName, sourceIPAddress | filter sourceIPAddress not like /^10\./ and sourceIPAddress not like /^192\.168\./ | filter eventName in ["CreateUser","CreateAccessKey","DeleteTrail","PutBucketPolicy","GetObject"] | sort eventTime desc | limit 100 # Query 3: Mass S3 data access fields eventTime, userIdentity.userName, sourceIPAddress, requestParameters.bucketName | filter eventName = "GetObject" | stats count(*) as access_count by userIdentity.userName, sourceIPAddress, bin(5m) | filter access_count > 10 | sort access_count desc
10 Simulate containment actions via AWS CLI

Document the containment commands (run only in your lab account):

# 1. Disable compromised user's access keys aws iam list-access-keys --user-name dev-alice aws iam update-access-key --user-name dev-alice \ --access-key-id AKIA... --status Inactive # 2. Attach deny-all policy to compromised account aws iam put-user-policy --user-name dev-alice \ --policy-name EmergencyDeny \ --policy-document '{ "Version":"2012-10-17", "Statement":[{"Effect":"Deny","Action":"*","Resource":"*"}] }' # 3. Remove unauthorized admin policy from compromised user aws iam detach-user-policy --user-name dev-alice \ --policy-arn arn:aws:iam::aws:policy/AdministratorAccess # 4. Delete rogue persistence user aws iam delete-access-key --user-name backup-svc --access-key-id AKIA... aws iam detach-user-policy --user-name backup-svc \ --policy-arn arn:aws:iam::aws:policy/AdministratorAccess aws iam delete-user --user-name backup-svc # 5. Terminate unauthorized EC2 instance aws ec2 terminate-instances --instance-ids i-0abc123def456789 # 6. Enable CloudTrail if disabled aws cloudtrail start-logging --name my-trail echo "Containment actions complete"
11 Map findings to MITRE ATT&CK Cloud matrix
PhaseActionATT&CK Technique
ReconListUsers, ListRoles, ListPoliciesT1526 — Cloud Service Discovery
Priv EscAttachUserPolicy (Admin)T1098.003 — Add AWS Credentials
ExfilS3 GetObject (15 objects)T1530 — Data from Cloud Storage
PersistCreateUser + AdminPolicyT1136.003 — Create Cloud Account
PersistRunInstancesT1578.002 — Create Cloud Instance
12 Generate executive incident report

Lab Findings

FindingDetails
Compromised account
Attacker IP
Initial access time
Privilege escalation method
Data exfiltrated
Persistence mechanism

Next: Lab L10 — Deepfake & Social Engineering Defense

Detect AI-generated deepfakes and build defenses against social engineering attacks.

Start L10 →
AI Cloud Security Analyst

Enter your Anthropic API key to activate the AI analyst:

Quick Prompts: