0 / 12 steps complete
Intermediate AI-Assisted Detection Engineering

L05: AI-Powered Phishing Detection

Build a machine learning phishing classifier using Python, scikit-learn, and NLP techniques. Analyze email headers, URLs, and body content to detect phishing campaigns at scale — then use AI to refine your model and explain predictions.

Python 3 scikit-learn NLTK pandas Kali Linux Jupyter
Phase 1: Environment & Dataset Setup
1 Install Python ML dependencies on Kali

On your Kali Linux VM, set up a Python virtual environment and install required packages:

sudo apt update && sudo apt install -y python3-pip python3-venv jupyter-notebook
python3 -m venv ~/phishing-lab
source ~/phishing-lab/bin/activate
pip install scikit-learn pandas numpy nltk matplotlib seaborn requests beautifulsoup4 imbalanced-learn

Verify installation:

python3 -c "import sklearn, pandas, nltk; print('All packages installed')"
2 Download and prepare phishing datasets

Download the UCI Phishing Websites dataset and SpamAssassin email corpus:

mkdir ~/phishing-lab/data && cd ~/phishing-lab/data

# Phishing URLs dataset (PhishTank-derived)
wget -O phishing_urls.csv \
  "https://web.archive.org/web/2024/https://raw.githubusercontent.com/GregaVrbancic/Phishing-Dataset/master/dataset_full.csv"

# SpamAssassin public corpus (legitimate + spam emails)
wget https://spamassassin.apache.org/old/publiccorpus/20030228_easy_ham.tar.bz2
wget https://spamassassin.apache.org/old/publiccorpus/20030228_spam_2.tar.bz2
tar -xjf 20030228_easy_ham.tar.bz2
tar -xjf 20030228_spam_2.tar.bz2
3 Launch Jupyter and create analysis notebook

Start Jupyter Notebook in your virtual environment:

source ~/phishing-lab/bin/activate
cd ~/phishing-lab
jupyter notebook --ip=0.0.0.0 --port=8888 --no-browser &
# Access at http://localhost:8888 in browser

Create a new notebook called phishing_detection.ipynb and import libraries:

import pandas as pd
import numpy as np
import re
import nltk
from urllib.parse import urlparse
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
from sklearn.pipeline import Pipeline
import matplotlib.pyplot as plt
import seaborn as sns

nltk.download(['stopwords', 'punkt', 'wordnet'])
Phase 2: Feature Engineering & URL Analysis
4 Extract URL-based phishing features

Implement URL feature extraction — phishing URLs have telltale structural patterns:

def extract_url_features(url):
    parsed = urlparse(url)
    domain = parsed.netloc
    path = parsed.path

    features = {
        'url_length': len(url),
        'domain_length': len(domain),
        'path_length': len(path),
        'num_dots': url.count('.'),
        'num_hyphens': url.count('-'),
        'num_at': url.count('@'),
        'num_subdomains': domain.count('.'),
        'has_ip': bool(re.match(r'\d+\.\d+\.\d+\.\d+', domain)),
        'has_https': int(parsed.scheme == 'https'),
        'has_port': int(':' in domain),
        'path_depth': len([p for p in path.split('/') if p]),
        'num_params': len(parsed.query.split('&')) if parsed.query else 0,
        'suspicious_keywords': int(bool(re.search(
            r'login|signin|account|update|verify|secure|bank|paypal|amazon',
            url.lower()))),
        'url_entropy': -sum((url.count(c)/len(url)) * np.log2(url.count(c)/len(url))
                           for c in set(url) if url.count(c) > 0),
    }
    return features

# Apply to dataset
df = pd.read_csv('data/phishing_urls.csv')
features_df = pd.DataFrame([extract_url_features(u) for u in df['url']])
print(features_df.describe())
5 Parse email headers and body content

Extract features from raw email files for text-based classification:

import email
import os
from email import policy

def parse_email(filepath):
    with open(filepath, 'rb') as f:
        msg = email.message_from_binary_file(f, policy=policy.default)

    body = ''
    if msg.is_multipart():
        for part in msg.walk():
            if part.get_content_type() == 'text/plain':
                body += part.get_content()
    else:
        body = msg.get_content()

    return {
        'subject': str(msg.get('Subject', '')),
        'from': str(msg.get('From', '')),
        'reply_to': str(msg.get('Reply-To', '')),
        'body': body,
        'has_html': int('text/html' in str(msg)),
        'num_links': len(re.findall(r'https?://', body)),
        'has_attachment': int(msg.is_multipart()),
        'subject_urgency': int(bool(re.search(
            r'urgent|immediately|action required|verify now|suspended',
            str(msg.get('Subject', '')).lower()))),
    }

# Load and label emails
ham_emails = [parse_email(f'data/easy_ham/{f}')
              for f in os.listdir('data/easy_ham')[:500]]
spam_emails = [parse_email(f'data/spam_2/{f}')
               for f in os.listdir('data/spam_2')[:500]]

for e in ham_emails: e['label'] = 0
for e in spam_emails: e['label'] = 1

email_df = pd.DataFrame(ham_emails + spam_emails)
print(f"Dataset: {len(ham_emails)} ham, {len(spam_emails)} spam")
Phase 3: Model Training & Evaluation
6 Build TF-IDF + Random Forest pipeline

Combine TF-IDF text features with structural email features in a unified pipeline:

from sklearn.pipeline import FeatureUnion
from sklearn.base import BaseEstimator, TransformerMixin
import scipy.sparse as sp

class TextExtractor(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None): return self
    def transform(self, X):
        return [row['subject'] + ' ' + row['body'] for _, row in X.iterrows()]

class NumericExtractor(BaseEstimator, TransformerMixin):
    num_cols = ['has_html', 'num_links', 'has_attachment', 'subject_urgency']
    def fit(self, X, y=None): return self
    def transform(self, X): return X[self.num_cols].values

X = email_df.drop('label', axis=1)
y = email_df['label']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y)

tfidf = TfidfVectorizer(max_features=5000, ngram_range=(1,2), stop_words='english')

from sklearn.preprocessing import FunctionTransformer
pipeline = Pipeline([
    ('rf', RandomForestClassifier(n_estimators=200, class_weight='balanced', random_state=42))
])

# Fit text features separately then combine
text_features = tfidf.fit_transform(
    [row['subject'] + ' ' + row['body'] for _, row in X_train.iterrows()])
numeric_features = X_train[['has_html','num_links','has_attachment','subject_urgency']].values

X_combined = sp.hstack([text_features, numeric_features])

from sklearn.ensemble import RandomForestClassifier
clf = RandomForestClassifier(n_estimators=200, class_weight='balanced', random_state=42)
clf.fit(X_combined, y_train)
print("Model trained!")
7 Evaluate model performance and ROC curve

Generate classification metrics and visualize the ROC curve:

from sklearn.metrics import roc_curve, auc

# Transform test set
text_test = tfidf.transform(
    [row['subject'] + ' ' + row['body'] for _, row in X_test.iterrows()])
numeric_test = X_test[['has_html','num_links','has_attachment','subject_urgency']].values
X_test_combined = sp.hstack([text_test, numeric_test])

y_pred = clf.predict(X_test_combined)
y_prob = clf.predict_proba(X_test_combined)[:, 1]

print(classification_report(y_test, y_pred, target_names=['Legitimate','Phishing']))
print(f"ROC-AUC: {roc_auc_score(y_test, y_prob):.4f}")

# Confusion matrix heatmap
cm = confusion_matrix(y_test, y_pred)
sns.heatmap(cm, annot=True, fmt='d', cmap='Reds',
            xticklabels=['Legit','Phishing'], yticklabels=['Legit','Phishing'])
plt.title('Confusion Matrix'); plt.tight_layout(); plt.savefig('confusion_matrix.png')

# ROC curve
fpr, tpr, _ = roc_curve(y_test, y_prob)
plt.figure(); plt.plot(fpr, tpr, label=f'AUC = {auc(fpr,tpr):.3f}')
plt.plot([0,1],[0,1],'--'); plt.xlabel('FPR'); plt.ylabel('TPR')
plt.title('ROC Curve — Phishing Classifier'); plt.legend(); plt.savefig('roc_curve.png')
8 Analyze feature importance and top indicators

Identify which features drive phishing predictions most strongly:

# Get TF-IDF vocabulary + numeric column names
tfidf_features = tfidf.get_feature_names_out()
numeric_cols = ['has_html', 'num_links', 'has_attachment', 'subject_urgency']
all_features = list(tfidf_features) + numeric_cols

importances = clf.feature_importances_
top_idx = np.argsort(importances)[::-1][:20]

print("\nTop 20 phishing indicators:")
for i in top_idx:
    print(f"  {all_features[i]:<40} {importances[i]:.4f}")

# Plot top features
plt.figure(figsize=(10,6))
plt.barh([all_features[i] for i in top_idx[::-1]],
         [importances[i] for i in top_idx[::-1]], color='#0e5f7a')
plt.xlabel('Feature Importance'); plt.title('Top Phishing Detection Features')
plt.tight_layout(); plt.savefig('feature_importance.png')
Phase 4: Real-Time Detection & AI Enhancement
9 Build real-time phishing URL scanner

Create a command-line tool that scores URLs in real time:

cat > ~/phishing-lab/url_scanner.py << 'EOF'
import sys, pickle, re, numpy as np
from urllib.parse import urlparse

# Load trained URL model (train separately on UCI dataset)
# For now use heuristic scoring
def score_url(url):
    parsed = urlparse(url)
    domain = parsed.netloc
    score = 0
    reasons = []

    if len(url) > 75:
        score += 20; reasons.append(f"Long URL ({len(url)} chars)")
    if re.match(r'\d+\.\d+\.\d+\.\d+', domain):
        score += 30; reasons.append("IP address as domain")
    if domain.count('.') > 3:
        score += 15; reasons.append(f"Excessive subdomains ({domain.count('.')})")
    if '@' in url:
        score += 25; reasons.append("@ symbol in URL")
    if re.search(r'login|signin|account|verify|secure|bank', url.lower()):
        score += 20; reasons.append("Suspicious keywords")
    if parsed.scheme != 'https':
        score += 10; reasons.append("Non-HTTPS")
    if '-' in domain and domain.count('-') > 2:
        score += 15; reasons.append("Multiple hyphens in domain")

    verdict = "PHISHING" if score >= 50 else "SUSPICIOUS" if score >= 25 else "LEGITIMATE"
    return {'score': score, 'verdict': verdict, 'reasons': reasons}

if __name__ == '__main__':
    url = sys.argv[1] if len(sys.argv) > 1 else input("Enter URL to scan: ")
    result = score_url(url)
    print(f"\nURL: {url}")
    print(f"Score: {result['score']}/100")
    print(f"Verdict: {result['verdict']}")
    if result['reasons']:
        print("Indicators:")
        for r in result['reasons']: print(f"  - {r}")
EOF
python3 ~/phishing-lab/url_scanner.py "http://paypa1-verify.ru/login/account?id=12345"
10 Test against live phishing samples from PhishTank

Fetch recent phishing URLs from PhishTank API and batch-test your scanner:

cat > ~/phishing-lab/batch_test.py << 'EOF'
import requests, json
from url_scanner import score_url

# PhishTank verified phishing URLs (free API, no key required for limited use)
resp = requests.get('http://data.phishtank.com/data/online-valid.json',
                    headers={'User-Agent': 'phishing-lab-research/1.0'},
                    timeout=30)
phishing_urls = [entry['url'] for entry in resp.json()[:50]]

results = {'phishing_detected': 0, 'missed': 0, 'scores': []}
for url in phishing_urls:
    r = score_url(url)
    results['scores'].append(r['score'])
    if r['verdict'] in ('PHISHING', 'SUSPICIOUS'):
        results['phishing_detected'] += 1
    else:
        results['missed'] += 1
        print(f"MISSED: {url} (score={r['score']})")

print(f"\n--- Batch Results ---")
print(f"Detected: {results['phishing_detected']}/50")
print(f"Missed:   {results['missed']}/50")
print(f"Avg score: {sum(results['scores'])/len(results['scores']):.1f}")
EOF
cd ~/phishing-lab && python3 batch_test.py
11 Generate YARA rules from phishing email patterns

Convert your ML feature findings into actionable YARA rules for email gateways:

cat > ~/phishing-lab/phishing_rules.yar << 'EOF'
rule Phishing_Urgency_Language {
    meta:
        description = "Detects urgency language common in phishing emails"
        author = "CyberSec Pro Academy - L05"
        severity = "MEDIUM"
    strings:
        $u1 = "action required" nocase
        $u2 = "verify your account" nocase
        $u3 = "suspended" nocase
        $u4 = "immediately" nocase
        $u5 = "unusual activity" nocase
        $u6 = "click here to confirm" nocase
    condition:
        2 of ($u*)
}

rule Phishing_Credential_Harvest {
    meta:
        description = "Phishing page requesting credentials"
        severity = "HIGH"
    strings:
        $c1 = "password" nocase
        $c2 = "username" nocase
        $c3 = "login" nocase
        $link = /https?:\/\/[^\s"']{0,20}(paypal|amazon|microsoft|apple|google)[^\s"']{0,50}\.(ru|cn|tk|ml|ga)/
    condition:
        $link and (2 of ($c*))
}

rule Phishing_Fake_Brand_URL {
    meta:
        description = "URL contains brand name in non-official domain"
    strings:
        $b = /https?:\/\/[a-z0-9-]{0,30}(paypal|amazon|netflix|microsoft|apple|google|facebook)[a-z0-9-]{0,30}\.(xyz|top|tk|ml|ga|cf|gq|pw)/
    condition:
        $b
}
EOF
# Test against spam corpus
pip install yara-python
python3 -c "
import yara, os
rules = yara.compile('phishing_rules.yar')
for f in os.listdir('data/spam_2')[:20]:
    try:
        matches = rules.match(f'data/spam_2/{f}')
        if matches: print(f'{f}: {[m.rule for m in matches]}')
    except: pass
"
12 Document findings and export detection report

Compile your findings into a detection engineering report. Ask the AI analyst for help structuring your recommendations.

Lab Findings Summary

MetricValue
Model Accuracy
ROC-AUC Score
False Positive Rate
Top Phishing Indicator
PhishTank Detection Rate
YARA Rules Created

Next: Lab L06 — Network Anomaly Detection

Use ML-based anomaly detection to identify suspicious traffic patterns in network flow data.

Start L06 →
AI Detection Engineer

Enter your Anthropic API key to activate the AI analyst:

Quick Prompts: