Build a UEBA system that profiles normal user behavior from Windows Event Logs and flags anomalies — after-hours logins, privilege escalation, lateral movement, and insider threat patterns. Use Python, pandas, and scikit-learn on SIFT Workstation.
On SIFT Workstation (192.168.56.20), install required Python packages:
sudo apt update && sudo apt install -y python3-pip python3-venv python3 -m venv ~/ueba-lab source ~/ueba-lab/bin/activate pip install pandas numpy scikit-learn matplotlib seaborn python-evtx pywinrm mkdir ~/ueba-lab/logs ~/ueba-lab/models ~/ueba-lab/reports
Download a sample Windows Security Event Log dataset:
cd ~/ueba-lab/logs
# Download EVTX sample from SANS/DFIR.training datasets
wget -O Security.evtx \
"https://github.com/sbousseaden/EVTX-ATTACK-SAMPLES/raw/master/Credential%20Access/credential_access_lsass_access_meterpreter.evtx"
# Or generate synthetic log data for the lab
python3 -c "
import json, random, datetime
users = ['alice', 'bob', 'charlie', 'admin', 'svc_backup']
events = []
base = datetime.datetime(2024, 1, 15, 8, 0, 0)
for i in range(5000):
user = random.choices(users, weights=[40,30,20,8,2])[0]
hour = random.gauss(10, 2) if user != 'admin' else random.gauss(14, 4)
hour = max(0, min(23, int(hour)))
events.append({
'timestamp': (base + datetime.timedelta(
days=random.randint(0,30),
hours=hour,
minutes=random.randint(0,59))).isoformat(),
'user': user,
'event_id': random.choices([4624,4625,4648,4672,4768],[50,10,5,3,2])[0],
'logon_type': random.choice([2,3,7,10]),
'source_ip': f'192.168.56.{random.randint(10,50)}',
'workstation': f'WS{random.randint(1,20):02d}'
})
with open('security_events.json', 'w') as f:
json.dump(events, f)
print(f'Generated {len(events)} events')
"
Key Event IDs for UEBA analysis:
| Event ID | Description | UEBA Relevance |
|---|---|---|
| 4624 | Successful logon | Login time baseline |
| 4625 | Failed logon | Brute force, wrong creds |
| 4648 | Explicit credential logon | Pass-the-Hash, lateral movement |
| 4672 | Special privilege assigned | Privilege escalation |
| 4768/4769 | Kerberos TGT/service ticket | Kerberoasting |
| 4776 | NTLM auth attempt | Legacy auth, PtH |
cat > ~/ueba-lab/parse_events.py << 'EOF'
import json, pandas as pd
with open('logs/security_events.json') as f:
events = json.load(f)
df = pd.DataFrame(events)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['hour'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek # 0=Mon, 6=Sun
df['is_weekend'] = df['day_of_week'].isin([5,6]).astype(int)
df['is_after_hours'] = ((df['hour'] < 7) | (df['hour'] > 19)).astype(int)
df['date'] = df['timestamp'].dt.date
print("Event distribution:")
print(df['event_id'].value_counts())
print(f"\nTotal events: {len(df)}")
print(f"Unique users: {df['user'].nunique()}")
print(f"Date range: {df['date'].min()} to {df['date'].max()}")
df.to_csv('logs/parsed_events.csv', index=False)
EOF
cd ~/ueba-lab && python3 parse_events.py
Calculate each user's normal working hours distribution:
cat > ~/ueba-lab/build_baseline.py << 'EOF'
import pandas as pd, numpy as np
df = pd.read_csv('logs/parsed_events.csv', parse_dates=['timestamp'])
logins = df[df['event_id'] == 4624] # Successful logins only
# Per-user hourly profile (probability distribution)
user_profiles = {}
for user, group in logins.groupby('user'):
hours = group['hour'].values
profile = {
'mean_hour': hours.mean(),
'std_hour': hours.std(),
'median_hour': np.median(hours),
'typical_hours': list(pd.Series(hours).value_counts().head(5).index),
'weekend_login_rate': group['is_weekend'].mean(),
'after_hours_rate': group['is_after_hours'].mean(),
'unique_workstations': group['workstation'].nunique(),
'unique_source_ips': group['source_ip'].nunique(),
'daily_avg_logins': len(group) / group['date'].nunique(),
'failed_login_ratio': len(df[(df['user']==user)&(df['event_id']==4625)]) /
max(len(group), 1),
'privileged_logins': len(df[(df['user']==user)&(df['event_id']==4672)]),
}
user_profiles[user] = profile
profile_df = pd.DataFrame(user_profiles).T
print("User behavior profiles:")
print(profile_df.round(3))
profile_df.to_csv('models/user_baselines.csv')
EOF
python3 build_baseline.py
Compute daily risk scores for each user based on deviation from their baseline:
cat > ~/ueba-lab/risk_score.py << 'EOF'
import pandas as pd, numpy as np
from scipy import stats
df = pd.read_csv('logs/parsed_events.csv', parse_dates=['timestamp'])
baselines = pd.read_csv('models/user_baselines.csv', index_col=0)
def daily_risk_score(user_df, baseline, user):
if user not in baseline.index:
return 0
b = baseline.loc[user]
score = 0
reasons = []
# After-hours login
after_hrs = (user_df['is_after_hours'] == 1).sum()
if after_hrs > 0 and b['after_hours_rate'] < 0.05:
score += 30 * after_hrs
reasons.append(f"After-hours login x{after_hrs}")
# Weekend login
weekend = (user_df['is_weekend'] == 1).sum()
if weekend > 0 and b['weekend_login_rate'] < 0.1:
score += 20
reasons.append("Weekend login (unusual)")
# Login volume spike
daily_vol = len(user_df)
if daily_vol > b['daily_avg_logins'] * 3:
score += 25
reasons.append(f"Login volume spike: {daily_vol:.0f} vs avg {b['daily_avg_logins']:.1f}")
# New source IP
known_ips = user_df['source_ip'].nunique()
if known_ips > b['unique_source_ips'] * 1.5:
score += 20
reasons.append("Unusual source IP count")
# Failed logins
fails = (user_df['event_id'] == 4625).sum() if 'event_id' in user_df else 0
if fails >= 5:
score += 15 * (fails // 5)
reasons.append(f"{fails} failed login attempts")
# Privileged login
priv = (user_df['event_id'] == 4672).sum() if 'event_id' in user_df else 0
if priv > b['privileged_logins'] / 30 * 2:
score += 35
reasons.append(f"Unusual privileged access x{priv}")
return score, reasons
# Calculate daily scores
results = []
for (user, date), group in df.groupby(['user', 'date']):
score, reasons = daily_risk_score(group, baselines, user)
if score > 0:
results.append({'user': user, 'date': date, 'risk_score': score,
'reasons': '; '.join(reasons)})
risk_df = pd.DataFrame(results).sort_values('risk_score', ascending=False)
print("High-risk events detected:")
print(risk_df[risk_df['risk_score'] >= 30].head(20))
risk_df.to_csv('reports/risk_scores.csv', index=False)
EOF
python3 risk_score.py
Simulate a rogue insider: "alice" suddenly starts logging in at 2am, accessing 10 new workstations:
python3 << 'EOF'
import json, datetime, random
with open('logs/security_events.json') as f:
events = json.load(f)
# Inject insider threat events — alice after hours, many workstations, privilege use
base_date = datetime.datetime(2024, 2, 12, 2, 30, 0) # 2:30 AM
for i in range(30):
events.append({
'timestamp': (base_date + datetime.timedelta(minutes=i*3)).isoformat(),
'user': 'alice',
'event_id': random.choice([4624, 4624, 4648, 4672]),
'logon_type': 3, # Network logon
'source_ip': f'10.10.0.{random.randint(50,80)}', # External IP range
'workstation': f'SERVER{random.randint(1,10):02d}'
})
# Inject brute force against bob
for i in range(20):
events.append({
'timestamp': (datetime.datetime(2024, 2, 12, 22, 0, 0) +
datetime.timedelta(seconds=i*10)).isoformat(),
'user': 'bob',
'event_id': 4625, # Failed logon
'logon_type': 3,
'source_ip': '192.168.56.10', # Kali attacker
'workstation': 'DC01'
})
with open('logs/security_events_injected.json', 'w') as f:
json.dump(events, f)
print(f"Injected attack events. Total: {len(events)}")
EOF
Build user feature vectors and apply Isolation Forest across all users:
cat > ~/ueba-lab/ueba_model.py << 'EOF'
import pandas as pd, numpy as np, json
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
# Load injected data
with open('logs/security_events_injected.json') as f:
events = json.load(f)
df = pd.DataFrame(events)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['hour'] = df['timestamp'].dt.hour
df['is_after_hours'] = ((df['hour'] < 7) | (df['hour'] > 19)).astype(int)
df['is_weekend'] = df['timestamp'].dt.dayofweek.isin([5,6]).astype(int)
df['date'] = df['timestamp'].dt.date
# Daily feature vectors per user
daily_vectors = []
for (user, date), group in df.groupby(['user', 'date']):
vec = {
'user': user, 'date': str(date),
'login_count': len(group[group['event_id']==4624]),
'fail_count': len(group[group['event_id']==4625]),
'priv_count': len(group[group['event_id']==4672]),
'after_hours': group['is_after_hours'].sum(),
'weekend': group['is_weekend'].sum(),
'unique_workstations': group['workstation'].nunique(),
'unique_ips': group['source_ip'].nunique(),
'network_logons': (group['logon_type']==3).sum(),
'explicit_creds': len(group[group['event_id']==4648]),
}
daily_vectors.append(vec)
vdf = pd.DataFrame(daily_vectors)
feature_cols = [c for c in vdf.columns if c not in ['user','date']]
X = vdf[feature_cols].fillna(0)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
iso = IsolationForest(n_estimators=200, contamination=0.05, random_state=42)
vdf['anomaly'] = iso.fit_predict(X_scaled)
vdf['score'] = iso.score_samples(X_scaled)
anomalies = vdf[vdf['anomaly']==-1].sort_values('score')
print(f"\nAnomalous user-days detected: {len(anomalies)}")
print(anomalies[['user','date','login_count','fail_count','priv_count',
'after_hours','unique_workstations','score']].head(15))
vdf.to_csv('reports/ueba_results.csv', index=False)
EOF
cd ~/ueba-lab && python3 ueba_model.py
Identify users accessing unusual numbers of distinct hosts (lateral movement indicator):
cat > ~/ueba-lab/lateral_movement.py << 'EOF'
import pandas as pd, json
with open('logs/security_events_injected.json') as f:
events = json.load(f)
df = pd.DataFrame(events)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['date'] = df['timestamp'].dt.date
# Rolling 24-hour window: flag users accessing > mean + 2*std workstations
baseline = pd.read_csv('models/user_baselines.csv', index_col=0)
lateral_flags = []
for (user, date), group in df.groupby(['user', 'date']):
ws_count = group['workstation'].nunique()
if user in baseline.index:
baseline_ws = float(baseline.loc[user, 'unique_workstations'])
if ws_count > max(baseline_ws * 2, 3):
lateral_flags.append({
'user': user, 'date': str(date),
'workstations_accessed': ws_count,
'baseline_workstations': baseline_ws,
'workstation_list': list(group['workstation'].unique()),
'source_ips': list(group['source_ip'].unique()),
'risk': 'HIGH' if ws_count > baseline_ws * 3 else 'MEDIUM'
})
for flag in sorted(lateral_flags, key=lambda x: -x['workstations_accessed']):
print(f"\n[{flag['risk']}] {flag['user']} on {flag['date']}")
print(f" Workstations: {flag['workstations_accessed']} (baseline: {flag['baseline_workstations']:.1f})")
print(f" Systems accessed: {flag['workstation_list']}")
print(f" Source IPs: {flag['source_ips']}")
EOF
python3 lateral_movement.py
Identify credential stuffing and brute-force patterns from Event ID 4625:
cat > ~/ueba-lab/brute_force.py << 'EOF'
import pandas as pd, json
from datetime import timedelta
with open('logs/security_events_injected.json') as f:
events = json.load(f)
df = pd.DataFrame(events)
df['timestamp'] = pd.to_datetime(df['timestamp'])
fails = df[df['event_id']==4625].copy()
fails = fails.sort_values('timestamp')
# Sliding 5-minute window: >=5 fails from same IP = brute force
THRESHOLD = 5
WINDOW = timedelta(minutes=5)
brute_force_alerts = []
for ip, group in fails.groupby('source_ip'):
group = group.sort_values('timestamp')
for i, row in group.iterrows():
window = group[(group['timestamp'] >= row['timestamp']) &
(group['timestamp'] <= row['timestamp'] + WINDOW)]
if len(window) >= THRESHOLD:
brute_force_alerts.append({
'source_ip': ip,
'start_time': row['timestamp'].isoformat(),
'attempts': len(window),
'targeted_users': list(window['user'].unique()),
'targeted_workstations': list(window['workstation'].unique()),
})
break
for alert in brute_force_alerts:
print(f"\n[ALERT] Brute Force from {alert['source_ip']}")
print(f" Time: {alert['start_time']}")
print(f" Attempts: {alert['attempts']} in 5 minutes")
print(f" Targeted: {alert['targeted_users']}")
print(f" MITRE: T1110.003 — Password Spraying")
EOF
python3 brute_force.py
cat > ~/ueba-lab/visualize.py << 'EOF'
import pandas as pd, numpy as np, matplotlib.pyplot as plt, seaborn as sns
vdf = pd.read_csv('reports/ueba_results.csv')
vdf['date'] = pd.to_datetime(vdf['date'])
vdf['date_str'] = vdf['date'].dt.strftime('%m/%d')
# Pivot: users vs date, value = anomaly score (inverted so higher = worse)
pivot = vdf.pivot_table(
values='score', index='user', columns='date_str', aggfunc='min')
pivot_inverted = -pivot # Isolation Forest: more negative = more anomalous
plt.figure(figsize=(16, 5))
sns.heatmap(pivot_inverted, cmap='Reds', linewidths=0.3,
linecolor='gray', annot=False)
plt.title('UEBA Risk Heatmap — User Anomaly Scores Over Time')
plt.xlabel('Date'); plt.ylabel('User')
plt.tight_layout()
plt.savefig('reports/ueba_heatmap.png', dpi=150)
print("Risk heatmap saved to reports/ueba_heatmap.png")
# Login hour distribution per user
fig, axes = plt.subplots(1, vdf['user'].nunique(), figsize=(15, 3))
import json
with open('logs/security_events_injected.json') as f:
events = json.load(f)
df = pd.DataFrame(events)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['hour'] = df['timestamp'].dt.hour
for ax, (user, group) in zip(axes, df[df['event_id']==4624].groupby('user')):
ax.hist(group['hour'], bins=24, range=(0,24), color='#0e5f7a', alpha=0.8)
ax.set_title(user, fontsize=9)
ax.set_xlabel('Hour'); ax.set_xlim(0,24)
fig.suptitle('Login Hour Distribution per User', fontsize=12)
plt.tight_layout()
plt.savefig('reports/login_hours.png', dpi=150)
print("Login hour chart saved")
EOF
python3 visualize.py
cat > ~/ueba-lab/sigma_after_hours.yml << 'EOF'
title: After-Hours Login — Insider Threat Indicator
id: c3d4e5f6-a7b8-9012-cdef-012345678901
status: experimental
description: >
Detects interactive or network logon outside business hours (7am-7pm)
for users who historically don't log in during these windows.
author: CyberSec Pro Academy - L07
logsource:
product: windows
service: security
definition: 'Event ID 4624'
detection:
selection:
EventID: 4624
LogonType:
- 2 # Interactive
- 3 # Network
- 10 # RemoteInteractive
timeframe: business_hours_outside # 00:00-07:00 or 19:00-23:59
filter_service_accounts:
AccountName|endswith: '$'
condition: selection and not filter_service_accounts
falsepositives:
- Authorized after-hours maintenance
- On-call staff responding to incidents
level: medium
tags:
- attack.initial_access
- attack.t1078
- insider_threat
EOF
echo "Sigma rule created"
| Detection | ATT&CK ID | Tactic |
|---|---|---|
| After-hours login | T1078 — Valid Accounts | Initial Access |
| Lateral movement (many hosts) | T1021 — Remote Services | Lateral Movement |
| Brute force detection | T1110.003 — Password Spray | Credential Access |
| Privileged access spike | T1078.003 — Local Accounts | Privilege Escalation |
| Explicit credential use | T1550.002 — Pass-the-Hash | Defense Evasion |
| Finding | Details |
|---|---|
| Insider threat user detected | |
| Brute force source IP | |
| Lateral movement host count | |
| Anomaly model AUC (if labeled) | |
| MITRE techniques detected |