0 / 12 steps complete
Intermediate AI-Assisted Threat Detection

L07: User & Entity Behavior Analytics (UEBA)

Build a UEBA system that profiles normal user behavior from Windows Event Logs and flags anomalies — after-hours logins, privilege escalation, lateral movement, and insider threat patterns. Use Python, pandas, and scikit-learn on SIFT Workstation.

Python 3 pandas scikit-learn SIFT Workstation Windows Event Logs python-evtx
Phase 1: Log Collection & Parsing
1 Set up UEBA environment on SIFT

On SIFT Workstation (192.168.56.20), install required Python packages:

sudo apt update && sudo apt install -y python3-pip python3-venv
python3 -m venv ~/ueba-lab
source ~/ueba-lab/bin/activate
pip install pandas numpy scikit-learn matplotlib seaborn python-evtx pywinrm

mkdir ~/ueba-lab/logs ~/ueba-lab/models ~/ueba-lab/reports

Download a sample Windows Security Event Log dataset:

cd ~/ueba-lab/logs # Download EVTX sample from SANS/DFIR.training datasets wget -O Security.evtx \ "https://github.com/sbousseaden/EVTX-ATTACK-SAMPLES/raw/master/Credential%20Access/credential_access_lsass_access_meterpreter.evtx" # Or generate synthetic log data for the lab python3 -c " import json, random, datetime users = ['alice', 'bob', 'charlie', 'admin', 'svc_backup'] events = [] base = datetime.datetime(2024, 1, 15, 8, 0, 0) for i in range(5000): user = random.choices(users, weights=[40,30,20,8,2])[0] hour = random.gauss(10, 2) if user != 'admin' else random.gauss(14, 4) hour = max(0, min(23, int(hour))) events.append({ 'timestamp': (base + datetime.timedelta( days=random.randint(0,30), hours=hour, minutes=random.randint(0,59))).isoformat(), 'user': user, 'event_id': random.choices([4624,4625,4648,4672,4768],[50,10,5,3,2])[0], 'logon_type': random.choice([2,3,7,10]), 'source_ip': f'192.168.56.{random.randint(10,50)}', 'workstation': f'WS{random.randint(1,20):02d}' }) with open('security_events.json', 'w') as f: json.dump(events, f) print(f'Generated {len(events)} events') "
2 Parse Windows Event Log key Event IDs

Key Event IDs for UEBA analysis:

Event IDDescriptionUEBA Relevance
4624Successful logonLogin time baseline
4625Failed logonBrute force, wrong creds
4648Explicit credential logonPass-the-Hash, lateral movement
4672Special privilege assignedPrivilege escalation
4768/4769Kerberos TGT/service ticketKerberoasting
4776NTLM auth attemptLegacy auth, PtH
cat > ~/ueba-lab/parse_events.py << 'EOF'
import json, pandas as pd

with open('logs/security_events.json') as f:
    events = json.load(f)

df = pd.DataFrame(events)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['hour'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek  # 0=Mon, 6=Sun
df['is_weekend'] = df['day_of_week'].isin([5,6]).astype(int)
df['is_after_hours'] = ((df['hour'] < 7) | (df['hour'] > 19)).astype(int)
df['date'] = df['timestamp'].dt.date

print("Event distribution:")
print(df['event_id'].value_counts())
print(f"\nTotal events: {len(df)}")
print(f"Unique users: {df['user'].nunique()}")
print(f"Date range: {df['date'].min()} to {df['date'].max()}")
df.to_csv('logs/parsed_events.csv', index=False)
EOF
cd ~/ueba-lab && python3 parse_events.py
Phase 2: Baseline User Behavior Profiling
3 Build per-user login time baseline

Calculate each user's normal working hours distribution:

cat > ~/ueba-lab/build_baseline.py << 'EOF'
import pandas as pd, numpy as np

df = pd.read_csv('logs/parsed_events.csv', parse_dates=['timestamp'])
logins = df[df['event_id'] == 4624]  # Successful logins only

# Per-user hourly profile (probability distribution)
user_profiles = {}
for user, group in logins.groupby('user'):
    hours = group['hour'].values
    profile = {
        'mean_hour': hours.mean(),
        'std_hour': hours.std(),
        'median_hour': np.median(hours),
        'typical_hours': list(pd.Series(hours).value_counts().head(5).index),
        'weekend_login_rate': group['is_weekend'].mean(),
        'after_hours_rate': group['is_after_hours'].mean(),
        'unique_workstations': group['workstation'].nunique(),
        'unique_source_ips': group['source_ip'].nunique(),
        'daily_avg_logins': len(group) / group['date'].nunique(),
        'failed_login_ratio': len(df[(df['user']==user)&(df['event_id']==4625)]) /
                              max(len(group), 1),
        'privileged_logins': len(df[(df['user']==user)&(df['event_id']==4672)]),
    }
    user_profiles[user] = profile

profile_df = pd.DataFrame(user_profiles).T
print("User behavior profiles:")
print(profile_df.round(3))
profile_df.to_csv('models/user_baselines.csv')
EOF
python3 build_baseline.py
4 Calculate per-user risk scores (rolling window)

Compute daily risk scores for each user based on deviation from their baseline:

cat > ~/ueba-lab/risk_score.py << 'EOF'
import pandas as pd, numpy as np
from scipy import stats

df = pd.read_csv('logs/parsed_events.csv', parse_dates=['timestamp'])
baselines = pd.read_csv('models/user_baselines.csv', index_col=0)

def daily_risk_score(user_df, baseline, user):
    if user not in baseline.index:
        return 0
    b = baseline.loc[user]
    score = 0
    reasons = []

    # After-hours login
    after_hrs = (user_df['is_after_hours'] == 1).sum()
    if after_hrs > 0 and b['after_hours_rate'] < 0.05:
        score += 30 * after_hrs
        reasons.append(f"After-hours login x{after_hrs}")

    # Weekend login
    weekend = (user_df['is_weekend'] == 1).sum()
    if weekend > 0 and b['weekend_login_rate'] < 0.1:
        score += 20
        reasons.append("Weekend login (unusual)")

    # Login volume spike
    daily_vol = len(user_df)
    if daily_vol > b['daily_avg_logins'] * 3:
        score += 25
        reasons.append(f"Login volume spike: {daily_vol:.0f} vs avg {b['daily_avg_logins']:.1f}")

    # New source IP
    known_ips = user_df['source_ip'].nunique()
    if known_ips > b['unique_source_ips'] * 1.5:
        score += 20
        reasons.append("Unusual source IP count")

    # Failed logins
    fails = (user_df['event_id'] == 4625).sum() if 'event_id' in user_df else 0
    if fails >= 5:
        score += 15 * (fails // 5)
        reasons.append(f"{fails} failed login attempts")

    # Privileged login
    priv = (user_df['event_id'] == 4672).sum() if 'event_id' in user_df else 0
    if priv > b['privileged_logins'] / 30 * 2:
        score += 35
        reasons.append(f"Unusual privileged access x{priv}")

    return score, reasons

# Calculate daily scores
results = []
for (user, date), group in df.groupby(['user', 'date']):
    score, reasons = daily_risk_score(group, baselines, user)
    if score > 0:
        results.append({'user': user, 'date': date, 'risk_score': score,
                        'reasons': '; '.join(reasons)})

risk_df = pd.DataFrame(results).sort_values('risk_score', ascending=False)
print("High-risk events detected:")
print(risk_df[risk_df['risk_score'] >= 30].head(20))
risk_df.to_csv('reports/risk_scores.csv', index=False)
EOF
python3 risk_score.py
Phase 3: Anomaly Detection & Insider Threat Patterns
5 Inject insider threat scenario

Simulate a rogue insider: "alice" suddenly starts logging in at 2am, accessing 10 new workstations:

python3 << 'EOF'
import json, datetime, random

with open('logs/security_events.json') as f:
    events = json.load(f)

# Inject insider threat events — alice after hours, many workstations, privilege use
base_date = datetime.datetime(2024, 2, 12, 2, 30, 0)  # 2:30 AM
for i in range(30):
    events.append({
        'timestamp': (base_date + datetime.timedelta(minutes=i*3)).isoformat(),
        'user': 'alice',
        'event_id': random.choice([4624, 4624, 4648, 4672]),
        'logon_type': 3,  # Network logon
        'source_ip': f'10.10.0.{random.randint(50,80)}',  # External IP range
        'workstation': f'SERVER{random.randint(1,10):02d}'
    })

# Inject brute force against bob
for i in range(20):
    events.append({
        'timestamp': (datetime.datetime(2024, 2, 12, 22, 0, 0) +
                      datetime.timedelta(seconds=i*10)).isoformat(),
        'user': 'bob',
        'event_id': 4625,  # Failed logon
        'logon_type': 3,
        'source_ip': '192.168.56.10',  # Kali attacker
        'workstation': 'DC01'
    })

with open('logs/security_events_injected.json', 'w') as f:
    json.dump(events, f)
print(f"Injected attack events. Total: {len(events)}")
EOF
6 Train Isolation Forest on user behavior vectors

Build user feature vectors and apply Isolation Forest across all users:

cat > ~/ueba-lab/ueba_model.py << 'EOF'
import pandas as pd, numpy as np, json
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler

# Load injected data
with open('logs/security_events_injected.json') as f:
    events = json.load(f)
df = pd.DataFrame(events)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['hour'] = df['timestamp'].dt.hour
df['is_after_hours'] = ((df['hour'] < 7) | (df['hour'] > 19)).astype(int)
df['is_weekend'] = df['timestamp'].dt.dayofweek.isin([5,6]).astype(int)
df['date'] = df['timestamp'].dt.date

# Daily feature vectors per user
daily_vectors = []
for (user, date), group in df.groupby(['user', 'date']):
    vec = {
        'user': user, 'date': str(date),
        'login_count': len(group[group['event_id']==4624]),
        'fail_count': len(group[group['event_id']==4625]),
        'priv_count': len(group[group['event_id']==4672]),
        'after_hours': group['is_after_hours'].sum(),
        'weekend': group['is_weekend'].sum(),
        'unique_workstations': group['workstation'].nunique(),
        'unique_ips': group['source_ip'].nunique(),
        'network_logons': (group['logon_type']==3).sum(),
        'explicit_creds': len(group[group['event_id']==4648]),
    }
    daily_vectors.append(vec)

vdf = pd.DataFrame(daily_vectors)
feature_cols = [c for c in vdf.columns if c not in ['user','date']]
X = vdf[feature_cols].fillna(0)

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

iso = IsolationForest(n_estimators=200, contamination=0.05, random_state=42)
vdf['anomaly'] = iso.fit_predict(X_scaled)
vdf['score'] = iso.score_samples(X_scaled)

anomalies = vdf[vdf['anomaly']==-1].sort_values('score')
print(f"\nAnomalous user-days detected: {len(anomalies)}")
print(anomalies[['user','date','login_count','fail_count','priv_count',
                   'after_hours','unique_workstations','score']].head(15))
vdf.to_csv('reports/ueba_results.csv', index=False)
EOF
cd ~/ueba-lab && python3 ueba_model.py
7 Detect lateral movement patterns

Identify users accessing unusual numbers of distinct hosts (lateral movement indicator):

cat > ~/ueba-lab/lateral_movement.py << 'EOF' import pandas as pd, json with open('logs/security_events_injected.json') as f: events = json.load(f) df = pd.DataFrame(events) df['timestamp'] = pd.to_datetime(df['timestamp']) df['date'] = df['timestamp'].dt.date # Rolling 24-hour window: flag users accessing > mean + 2*std workstations baseline = pd.read_csv('models/user_baselines.csv', index_col=0) lateral_flags = [] for (user, date), group in df.groupby(['user', 'date']): ws_count = group['workstation'].nunique() if user in baseline.index: baseline_ws = float(baseline.loc[user, 'unique_workstations']) if ws_count > max(baseline_ws * 2, 3): lateral_flags.append({ 'user': user, 'date': str(date), 'workstations_accessed': ws_count, 'baseline_workstations': baseline_ws, 'workstation_list': list(group['workstation'].unique()), 'source_ips': list(group['source_ip'].unique()), 'risk': 'HIGH' if ws_count > baseline_ws * 3 else 'MEDIUM' }) for flag in sorted(lateral_flags, key=lambda x: -x['workstations_accessed']): print(f"\n[{flag['risk']}] {flag['user']} on {flag['date']}") print(f" Workstations: {flag['workstations_accessed']} (baseline: {flag['baseline_workstations']:.1f})") print(f" Systems accessed: {flag['workstation_list']}") print(f" Source IPs: {flag['source_ips']}") EOF python3 lateral_movement.py
8 Detect brute force via failed login spike

Identify credential stuffing and brute-force patterns from Event ID 4625:

cat > ~/ueba-lab/brute_force.py << 'EOF'
import pandas as pd, json
from datetime import timedelta

with open('logs/security_events_injected.json') as f:
    events = json.load(f)
df = pd.DataFrame(events)
df['timestamp'] = pd.to_datetime(df['timestamp'])

fails = df[df['event_id']==4625].copy()
fails = fails.sort_values('timestamp')

# Sliding 5-minute window: >=5 fails from same IP = brute force
THRESHOLD = 5
WINDOW = timedelta(minutes=5)

brute_force_alerts = []
for ip, group in fails.groupby('source_ip'):
    group = group.sort_values('timestamp')
    for i, row in group.iterrows():
        window = group[(group['timestamp'] >= row['timestamp']) &
                       (group['timestamp'] <= row['timestamp'] + WINDOW)]
        if len(window) >= THRESHOLD:
            brute_force_alerts.append({
                'source_ip': ip,
                'start_time': row['timestamp'].isoformat(),
                'attempts': len(window),
                'targeted_users': list(window['user'].unique()),
                'targeted_workstations': list(window['workstation'].unique()),
            })
            break

for alert in brute_force_alerts:
    print(f"\n[ALERT] Brute Force from {alert['source_ip']}")
    print(f"  Time: {alert['start_time']}")
    print(f"  Attempts: {alert['attempts']} in 5 minutes")
    print(f"  Targeted: {alert['targeted_users']}")
    print(f"  MITRE: T1110.003 — Password Spraying")
EOF
python3 brute_force.py
Phase 4: Visualization & Reporting
9 Generate user risk heatmap
cat > ~/ueba-lab/visualize.py << 'EOF'
import pandas as pd, numpy as np, matplotlib.pyplot as plt, seaborn as sns

vdf = pd.read_csv('reports/ueba_results.csv')
vdf['date'] = pd.to_datetime(vdf['date'])
vdf['date_str'] = vdf['date'].dt.strftime('%m/%d')

# Pivot: users vs date, value = anomaly score (inverted so higher = worse)
pivot = vdf.pivot_table(
    values='score', index='user', columns='date_str', aggfunc='min')
pivot_inverted = -pivot  # Isolation Forest: more negative = more anomalous

plt.figure(figsize=(16, 5))
sns.heatmap(pivot_inverted, cmap='Reds', linewidths=0.3,
            linecolor='gray', annot=False)
plt.title('UEBA Risk Heatmap — User Anomaly Scores Over Time')
plt.xlabel('Date'); plt.ylabel('User')
plt.tight_layout()
plt.savefig('reports/ueba_heatmap.png', dpi=150)
print("Risk heatmap saved to reports/ueba_heatmap.png")

# Login hour distribution per user
fig, axes = plt.subplots(1, vdf['user'].nunique(), figsize=(15, 3))
import json
with open('logs/security_events_injected.json') as f:
    events = json.load(f)
df = pd.DataFrame(events)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['hour'] = df['timestamp'].dt.hour

for ax, (user, group) in zip(axes, df[df['event_id']==4624].groupby('user')):
    ax.hist(group['hour'], bins=24, range=(0,24), color='#0e5f7a', alpha=0.8)
    ax.set_title(user, fontsize=9)
    ax.set_xlabel('Hour'); ax.set_xlim(0,24)
fig.suptitle('Login Hour Distribution per User', fontsize=12)
plt.tight_layout()
plt.savefig('reports/login_hours.png', dpi=150)
print("Login hour chart saved")
EOF
python3 visualize.py
10 Write Sigma rules for UEBA detections
cat > ~/ueba-lab/sigma_after_hours.yml << 'EOF'
title: After-Hours Login — Insider Threat Indicator
id: c3d4e5f6-a7b8-9012-cdef-012345678901
status: experimental
description: >
  Detects interactive or network logon outside business hours (7am-7pm)
  for users who historically don't log in during these windows.
author: CyberSec Pro Academy - L07
logsource:
    product: windows
    service: security
    definition: 'Event ID 4624'
detection:
    selection:
        EventID: 4624
        LogonType:
            - 2    # Interactive
            - 3    # Network
            - 10   # RemoteInteractive
    timeframe: business_hours_outside  # 00:00-07:00 or 19:00-23:59
    filter_service_accounts:
        AccountName|endswith: '$'
    condition: selection and not filter_service_accounts
falsepositives:
    - Authorized after-hours maintenance
    - On-call staff responding to incidents
level: medium
tags:
    - attack.initial_access
    - attack.t1078
    - insider_threat
EOF
echo "Sigma rule created"
11 Map all detections to MITRE ATT&CK
DetectionATT&CK IDTactic
After-hours loginT1078 — Valid AccountsInitial Access
Lateral movement (many hosts)T1021 — Remote ServicesLateral Movement
Brute force detectionT1110.003 — Password SprayCredential Access
Privileged access spikeT1078.003 — Local AccountsPrivilege Escalation
Explicit credential useT1550.002 — Pass-the-HashDefense Evasion
12 Document findings and produce UEBA report

Lab Findings

FindingDetails
Insider threat user detected
Brute force source IP
Lateral movement host count
Anomaly model AUC (if labeled)
MITRE techniques detected

Next: Lab L08 — AI Incident Response Automation

Investigate a simulated ransomware incident using SIFT Workstation and AI-assisted analysis.

Start L08 →
AI UEBA Analyst

Enter your Anthropic API key to activate the AI analyst:

Quick Prompts: