Investigate a simulated AWS breach using CloudTrail logs. Detect privilege escalation, data exfiltration, IAM abuse, and unauthorized resource creation. Use Python to parse logs at scale, build detection queries, and generate incident reports aligned with the AWS Shared Responsibility Model.
On Kali Linux, install the AWS CLI and Python dependencies:
sudo apt update && sudo apt install -y awscli jq python3-pip pip3 install boto3 pandas matplotlib mkdir ~/cloud-lab && cd ~/cloud-lab # Configure AWS CLI with read-only credentials for the lab # (Use a dedicated lab AWS account or provided credentials) aws configure # AWS Access Key ID: [your lab key] # AWS Secret Access Key: [your lab secret] # Default region: us-east-1 # Output format: json # Verify access aws sts get-caller-identity
Generate a realistic CloudTrail log dataset containing a multi-stage AWS breach:
cat > ~/cloud-lab/generate_cloudtrail.py << 'EOF'
import json, random, datetime, uuid
def make_event(time, user, action, resource, source_ip, region='us-east-1', error=None):
service, api = action.split(':')
event = {
"eventVersion": "1.09",
"userIdentity": {
"type": "IAMUser",
"principalId": f"AIDA{uuid.uuid4().hex[:16].upper()}",
"arn": f"arn:aws:iam::123456789012:user/{user}",
"accountId": "123456789012",
"userName": user
},
"eventTime": time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"eventSource": f"{service}.amazonaws.com",
"eventName": api,
"awsRegion": region,
"sourceIPAddress": source_ip,
"userAgent": "aws-cli/2.15.0 Python/3.12.0",
"requestID": str(uuid.uuid4()),
"eventID": str(uuid.uuid4()),
"readOnly": api.startswith(('Get','List','Describe')),
"resources": [{"type": "AWS::IAM::User", "ARN": resource}],
}
if error:
event["errorCode"] = error[0]
event["errorMessage"] = error[1]
return event
events = []
base = datetime.datetime(2024, 3, 1, 8, 0, 0)
attacker_ip = "185.234.219.44" # External IP
legitimate_ip = "10.0.1.50"
# Phase 1: Reconnaissance (legitimate-looking)
for i in range(20):
t = base + datetime.timedelta(minutes=i*2)
events.append(make_event(t, "dev-alice", "iam:ListUsers",
"arn:aws:iam::123456789012:user/*", legitimate_ip))
events.append(make_event(t+datetime.timedelta(seconds=30),
"dev-alice", "iam:ListRoles",
"arn:aws:iam::123456789012:role/*", legitimate_ip))
# Phase 2: Compromised credential — attacker from external IP
compromise_time = base + datetime.timedelta(hours=2)
# Attacker enumerates
for api in ["iam:ListUsers","iam:ListPolicies","iam:GetAccountSummary",
"ec2:DescribeInstances","s3:ListBuckets","iam:ListAttachedUserPolicies"]:
events.append(make_event(compromise_time, "dev-alice", api,
"arn:aws:iam::123456789012:user/dev-alice", attacker_ip))
compromise_time += datetime.timedelta(seconds=15)
# Phase 3: Privilege escalation
for api, resource in [
("iam:AttachUserPolicy", "arn:aws:iam::aws:policy/AdministratorAccess"),
("iam:CreateAccessKey", "arn:aws:iam::123456789012:user/dev-alice"),
("iam:PutUserPolicy", "arn:aws:iam::123456789012:user/dev-alice"),
]:
events.append(make_event(compromise_time, "dev-alice", api, resource, attacker_ip))
compromise_time += datetime.timedelta(seconds=30)
# Phase 4: Data exfiltration
for i in range(15):
events.append(make_event(compromise_time, "dev-alice", "s3:GetObject",
f"arn:aws:s3:::company-sensitive-data/confidential/file{i}.csv", attacker_ip))
compromise_time += datetime.timedelta(seconds=10)
# Phase 5: Persistence — new admin user
for api, resource in [
("iam:CreateUser", "arn:aws:iam::123456789012:user/backup-svc"),
("iam:AttachUserPolicy", "arn:aws:iam::aws:policy/AdministratorAccess"),
("iam:CreateAccessKey", "arn:aws:iam::123456789012:user/backup-svc"),
("ec2:RunInstances", "arn:aws:ec2:us-east-1:123456789012:instance/*"),
]:
events.append(make_event(compromise_time, "dev-alice", api, resource, attacker_ip))
compromise_time += datetime.timedelta(seconds=45)
# Add some noise (normal activity from legit users)
for i in range(50):
t = base + datetime.timedelta(minutes=random.randint(0, 480))
user = random.choice(["dev-bob", "dev-charlie", "svc-deploy"])
events.append(make_event(t, user,
random.choice(["ec2:DescribeInstances","s3:ListBuckets","iam:GetUser"]),
f"arn:aws:iam::123456789012:user/{user}", legitimate_ip))
events.sort(key=lambda x: x['eventTime'])
with open('cloudtrail_breach.json', 'w') as f:
json.dump({"Records": events}, f, indent=2)
print(f"Generated {len(events)} CloudTrail events")
EOF
cd ~/cloud-lab && python3 generate_cloudtrail.py
cat > ~/cloud-lab/parse_cloudtrail.py << 'EOF'
import json, pandas as pd
with open('cloudtrail_breach.json') as f:
data = json.load(f)
records = []
for event in data['Records']:
records.append({
'time': event['eventTime'],
'user': event['userIdentity'].get('userName', event['userIdentity'].get('type','')),
'event_name': event['eventName'],
'event_source': event['eventSource'],
'source_ip': event['sourceIPAddress'],
'region': event['awsRegion'],
'read_only': event.get('readOnly', False),
'error_code': event.get('errorCode', ''),
'resource': event['resources'][0]['ARN'] if event.get('resources') else '',
})
df = pd.DataFrame(records)
df['time'] = pd.to_datetime(df['time'])
df['hour'] = df['time'].dt.hour
print(f"Total events: {len(df)}")
print(f"\nEvents by user:\n{df['user'].value_counts()}")
print(f"\nEvents by source IP:\n{df['source_ip'].value_counts().head()}")
print(f"\nUnique API calls: {df['event_name'].nunique()}")
df.to_csv('cloudtrail_parsed.csv', index=False)
print("\nSaved to cloudtrail_parsed.csv")
EOF
python3 parse_cloudtrail.py
Identify accounts performing excessive IAM read operations — a common pre-escalation step:
python3 << 'EOF'
import pandas as pd
df = pd.read_csv('cloudtrail_parsed.csv', parse_dates=['time'])
# IAM enumeration APIs
iam_recon_apis = ['ListUsers','ListRoles','ListPolicies','GetAccountSummary',
'ListAttachedUserPolicies','ListGroupsForUser','GetUser',
'ListAccessKeys','GetPolicy','ListAccountAliases']
# Flag: multiple distinct IAM read APIs from same IP in short window
recon = df[df['event_name'].isin(iam_recon_apis)].copy()
recon_summary = recon.groupby(['source_ip', 'user']).agg(
api_count=('event_name', 'count'),
unique_apis=('event_name', 'nunique'),
first_seen=('time', 'min'),
last_seen=('time', 'max')
).reset_index()
recon_summary['duration_min'] = (
recon_summary['last_seen'] - recon_summary['first_seen']
).dt.total_seconds() / 60
# Flag: many distinct APIs from external IP
flagged = recon_summary[recon_summary['unique_apis'] >= 4]
print("IAM Reconnaissance Detection:")
print(flagged.to_string(index=False))
EOF
Flag AttachUserPolicy / PutUserPolicy calls granting AdministratorAccess:
python3 << 'EOF'
import pandas as pd
df = pd.read_csv('cloudtrail_parsed.csv', parse_dates=['time'])
# Privilege escalation indicators
privesc_apis = ['AttachUserPolicy', 'PutUserPolicy', 'AttachRolePolicy',
'CreatePolicyVersion', 'SetDefaultPolicyVersion',
'AddUserToGroup', 'CreateLoginProfile', 'UpdateLoginProfile']
privesc = df[df['event_name'].isin(privesc_apis)].copy()
print(f"[!] Privilege escalation API calls detected: {len(privesc)}")
for _, row in privesc.iterrows():
print(f"\n Time: {row['time']}")
print(f" User: {row['user']}")
print(f" API: {row['event_name']}")
print(f" Resource:{row['resource']}")
print(f" Src IP: {row['source_ip']}")
# Check for AdministratorAccess policy attachment
if 'AdministratorAccess' in str(row['resource']):
print(f" [CRITICAL] AdministratorAccess policy attached!")
print(f" MITRE: T1098 — Account Manipulation")
EOF
python3 << 'EOF'
import pandas as pd
from datetime import timedelta
df = pd.read_csv('cloudtrail_parsed.csv', parse_dates=['time'])
# S3 GetObject spikes (>10 objects in 5 minutes from external IP)
s3_gets = df[df['event_name'] == 'GetObject'].copy()
results = []
for ip, group in s3_gets.groupby('source_ip'):
group = group.sort_values('time')
for i, row in group.iterrows():
window = group[(group['time'] >= row['time']) &
(group['time'] <= row['time'] + timedelta(minutes=5))]
if len(window) >= 10:
results.append({
'source_ip': ip,
'user': row['user'],
'start': row['time'].isoformat(),
'objects_accessed': len(window),
'buckets': list(window['resource'].str.extract(r'arn:aws:s3:::([^/]+)')[0].unique()),
})
break
if results:
for r in results:
print(f"\n[ALERT] S3 Mass Download Detected!")
print(f" Source IP: {r['source_ip']}")
print(f" User: {r['user']}")
print(f" Time: {r['start']}")
print(f" Objects: {r['objects_accessed']} in 5 min")
print(f" Buckets: {r['buckets']}")
print(f" MITRE: T1530 — Data from Cloud Storage Object")
else:
print("No S3 exfiltration detected in this window")
EOF
python3 << 'EOF'
import pandas as pd
df = pd.read_csv('cloudtrail_parsed.csv', parse_dates=['time'])
# Persistence: CreateUser → AttachUserPolicy (Admin) → CreateAccessKey
# happening in rapid sequence
persistence_apis = ['CreateUser', 'CreateAccessKey', 'AttachUserPolicy']
persist = df[df['event_name'].isin(persistence_apis)].copy()
print(f"Persistence-related API calls: {len(persist)}")
print()
for _, row in persist.iterrows():
alert_level = "CRITICAL" if row['event_name'] in ['CreateUser','CreateAccessKey'] else "HIGH"
print(f"[{alert_level}] {row['time']} | {row['user']}@{row['source_ip']}")
print(f" {row['event_name']} → {row['resource']}")
# Also check for unauthorized EC2 instance launches (crypto mining, C2 hosting)
ec2_launches = df[df['event_name'] == 'RunInstances']
if len(ec2_launches) > 0:
print(f"\n[CRITICAL] Unauthorized EC2 launch detected:")
for _, row in ec2_launches.iterrows():
print(f" {row['time']} | {row['user']}@{row['source_ip']}")
print(f" MITRE: T1578.002 — Create Cloud Instance")
EOF
python3 << 'EOF'
import pandas as pd
df = pd.read_csv('cloudtrail_parsed.csv', parse_dates=['time'])
ATTACKER_IP = "185.234.219.44"
# Filter to attacker IP and sort chronologically
attacker_events = df[df['source_ip'] == ATTACKER_IP].sort_values('time')
print("=" * 70)
print("ATTACK TIMELINE RECONSTRUCTION")
print("=" * 70)
phases = {
'Reconnaissance': ['ListUsers','ListRoles','ListPolicies','GetAccountSummary',
'DescribeInstances','ListBuckets','ListAttachedUserPolicies'],
'Privilege Escalation': ['AttachUserPolicy','PutUserPolicy','CreateAccessKey'],
'Data Exfiltration': ['GetObject'],
'Persistence': ['CreateUser','RunInstances'],
}
for _, row in attacker_events.iterrows():
phase = 'Other'
for p, apis in phases.items():
if row['event_name'] in apis: phase = p; break
print(f"[{row['time'].strftime('%H:%M:%S')}] [{phase:22}] {row['event_name']:<30} {row['user']}")
print(f"\nTotal attacker events: {len(attacker_events)}")
print(f"Compromised account: {attacker_events['user'].mode()[0]}")
EOF
# CloudWatch Logs Insights query — privilege escalation detection # Run these in AWS Console → CloudWatch → Log Insights → Select CloudTrail log group # Query 1: IAM privilege escalation fields eventTime, userIdentity.userName, eventName, requestParameters.policyArn | filter eventName in ["AttachUserPolicy", "PutUserPolicy", "AttachRolePolicy", "CreatePolicyVersion"] | filter requestParameters.policyArn like /AdministratorAccess/ | sort eventTime desc | limit 50 # Query 2: External IP accessing sensitive APIs fields eventTime, userIdentity.userName, eventName, sourceIPAddress | filter sourceIPAddress not like /^10\./ and sourceIPAddress not like /^192\.168\./ | filter eventName in ["CreateUser","CreateAccessKey","DeleteTrail","PutBucketPolicy","GetObject"] | sort eventTime desc | limit 100 # Query 3: Mass S3 data access fields eventTime, userIdentity.userName, sourceIPAddress, requestParameters.bucketName | filter eventName = "GetObject" | stats count(*) as access_count by userIdentity.userName, sourceIPAddress, bin(5m) | filter access_count > 10 | sort access_count desc
Document the containment commands (run only in your lab account):
# 1. Disable compromised user's access keys
aws iam list-access-keys --user-name dev-alice
aws iam update-access-key --user-name dev-alice \
--access-key-id AKIA... --status Inactive
# 2. Attach deny-all policy to compromised account
aws iam put-user-policy --user-name dev-alice \
--policy-name EmergencyDeny \
--policy-document '{
"Version":"2012-10-17",
"Statement":[{"Effect":"Deny","Action":"*","Resource":"*"}]
}'
# 3. Remove unauthorized admin policy from compromised user
aws iam detach-user-policy --user-name dev-alice \
--policy-arn arn:aws:iam::aws:policy/AdministratorAccess
# 4. Delete rogue persistence user
aws iam delete-access-key --user-name backup-svc --access-key-id AKIA...
aws iam detach-user-policy --user-name backup-svc \
--policy-arn arn:aws:iam::aws:policy/AdministratorAccess
aws iam delete-user --user-name backup-svc
# 5. Terminate unauthorized EC2 instance
aws ec2 terminate-instances --instance-ids i-0abc123def456789
# 6. Enable CloudTrail if disabled
aws cloudtrail start-logging --name my-trail
echo "Containment actions complete"
| Phase | Action | ATT&CK Technique |
|---|---|---|
| Recon | ListUsers, ListRoles, ListPolicies | T1526 — Cloud Service Discovery |
| Priv Esc | AttachUserPolicy (Admin) | T1098.003 — Add AWS Credentials |
| Exfil | S3 GetObject (15 objects) | T1530 — Data from Cloud Storage |
| Persist | CreateUser + AdminPolicy | T1136.003 — Create Cloud Account |
| Persist | RunInstances | T1578.002 — Create Cloud Instance |
| Finding | Details |
|---|---|
| Compromised account | |
| Attacker IP | |
| Initial access time | |
| Privilege escalation method | |
| Data exfiltrated | |
| Persistence mechanism |