Build a machine learning phishing classifier using Python, scikit-learn, and NLP techniques. Analyze email headers, URLs, and body content to detect phishing campaigns at scale — then use AI to refine your model and explain predictions.
On your Kali Linux VM, set up a Python virtual environment and install required packages:
sudo apt update && sudo apt install -y python3-pip python3-venv jupyter-notebook python3 -m venv ~/phishing-lab source ~/phishing-lab/bin/activate pip install scikit-learn pandas numpy nltk matplotlib seaborn requests beautifulsoup4 imbalanced-learn
Verify installation:
python3 -c "import sklearn, pandas, nltk; print('All packages installed')"
Download the UCI Phishing Websites dataset and SpamAssassin email corpus:
mkdir ~/phishing-lab/data && cd ~/phishing-lab/data # Phishing URLs dataset (PhishTank-derived) wget -O phishing_urls.csv \ "https://web.archive.org/web/2024/https://raw.githubusercontent.com/GregaVrbancic/Phishing-Dataset/master/dataset_full.csv" # SpamAssassin public corpus (legitimate + spam emails) wget https://spamassassin.apache.org/old/publiccorpus/20030228_easy_ham.tar.bz2 wget https://spamassassin.apache.org/old/publiccorpus/20030228_spam_2.tar.bz2 tar -xjf 20030228_easy_ham.tar.bz2 tar -xjf 20030228_spam_2.tar.bz2
Start Jupyter Notebook in your virtual environment:
source ~/phishing-lab/bin/activate cd ~/phishing-lab jupyter notebook --ip=0.0.0.0 --port=8888 --no-browser & # Access at http://localhost:8888 in browser
Create a new notebook called phishing_detection.ipynb and import libraries:
import pandas as pd import numpy as np import re import nltk from urllib.parse import urlparse from sklearn.model_selection import train_test_split from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score from sklearn.pipeline import Pipeline import matplotlib.pyplot as plt import seaborn as sns nltk.download(['stopwords', 'punkt', 'wordnet'])
Implement URL feature extraction — phishing URLs have telltale structural patterns:
def extract_url_features(url):
parsed = urlparse(url)
domain = parsed.netloc
path = parsed.path
features = {
'url_length': len(url),
'domain_length': len(domain),
'path_length': len(path),
'num_dots': url.count('.'),
'num_hyphens': url.count('-'),
'num_at': url.count('@'),
'num_subdomains': domain.count('.'),
'has_ip': bool(re.match(r'\d+\.\d+\.\d+\.\d+', domain)),
'has_https': int(parsed.scheme == 'https'),
'has_port': int(':' in domain),
'path_depth': len([p for p in path.split('/') if p]),
'num_params': len(parsed.query.split('&')) if parsed.query else 0,
'suspicious_keywords': int(bool(re.search(
r'login|signin|account|update|verify|secure|bank|paypal|amazon',
url.lower()))),
'url_entropy': -sum((url.count(c)/len(url)) * np.log2(url.count(c)/len(url))
for c in set(url) if url.count(c) > 0),
}
return features
# Apply to dataset
df = pd.read_csv('data/phishing_urls.csv')
features_df = pd.DataFrame([extract_url_features(u) for u in df['url']])
print(features_df.describe())
Extract features from raw email files for text-based classification:
import email
import os
from email import policy
def parse_email(filepath):
with open(filepath, 'rb') as f:
msg = email.message_from_binary_file(f, policy=policy.default)
body = ''
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == 'text/plain':
body += part.get_content()
else:
body = msg.get_content()
return {
'subject': str(msg.get('Subject', '')),
'from': str(msg.get('From', '')),
'reply_to': str(msg.get('Reply-To', '')),
'body': body,
'has_html': int('text/html' in str(msg)),
'num_links': len(re.findall(r'https?://', body)),
'has_attachment': int(msg.is_multipart()),
'subject_urgency': int(bool(re.search(
r'urgent|immediately|action required|verify now|suspended',
str(msg.get('Subject', '')).lower()))),
}
# Load and label emails
ham_emails = [parse_email(f'data/easy_ham/{f}')
for f in os.listdir('data/easy_ham')[:500]]
spam_emails = [parse_email(f'data/spam_2/{f}')
for f in os.listdir('data/spam_2')[:500]]
for e in ham_emails: e['label'] = 0
for e in spam_emails: e['label'] = 1
email_df = pd.DataFrame(ham_emails + spam_emails)
print(f"Dataset: {len(ham_emails)} ham, {len(spam_emails)} spam")
Combine TF-IDF text features with structural email features in a unified pipeline:
from sklearn.pipeline import FeatureUnion
from sklearn.base import BaseEstimator, TransformerMixin
import scipy.sparse as sp
class TextExtractor(BaseEstimator, TransformerMixin):
def fit(self, X, y=None): return self
def transform(self, X):
return [row['subject'] + ' ' + row['body'] for _, row in X.iterrows()]
class NumericExtractor(BaseEstimator, TransformerMixin):
num_cols = ['has_html', 'num_links', 'has_attachment', 'subject_urgency']
def fit(self, X, y=None): return self
def transform(self, X): return X[self.num_cols].values
X = email_df.drop('label', axis=1)
y = email_df['label']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y)
tfidf = TfidfVectorizer(max_features=5000, ngram_range=(1,2), stop_words='english')
from sklearn.preprocessing import FunctionTransformer
pipeline = Pipeline([
('rf', RandomForestClassifier(n_estimators=200, class_weight='balanced', random_state=42))
])
# Fit text features separately then combine
text_features = tfidf.fit_transform(
[row['subject'] + ' ' + row['body'] for _, row in X_train.iterrows()])
numeric_features = X_train[['has_html','num_links','has_attachment','subject_urgency']].values
X_combined = sp.hstack([text_features, numeric_features])
from sklearn.ensemble import RandomForestClassifier
clf = RandomForestClassifier(n_estimators=200, class_weight='balanced', random_state=42)
clf.fit(X_combined, y_train)
print("Model trained!")
Generate classification metrics and visualize the ROC curve:
from sklearn.metrics import roc_curve, auc
# Transform test set
text_test = tfidf.transform(
[row['subject'] + ' ' + row['body'] for _, row in X_test.iterrows()])
numeric_test = X_test[['has_html','num_links','has_attachment','subject_urgency']].values
X_test_combined = sp.hstack([text_test, numeric_test])
y_pred = clf.predict(X_test_combined)
y_prob = clf.predict_proba(X_test_combined)[:, 1]
print(classification_report(y_test, y_pred, target_names=['Legitimate','Phishing']))
print(f"ROC-AUC: {roc_auc_score(y_test, y_prob):.4f}")
# Confusion matrix heatmap
cm = confusion_matrix(y_test, y_pred)
sns.heatmap(cm, annot=True, fmt='d', cmap='Reds',
xticklabels=['Legit','Phishing'], yticklabels=['Legit','Phishing'])
plt.title('Confusion Matrix'); plt.tight_layout(); plt.savefig('confusion_matrix.png')
# ROC curve
fpr, tpr, _ = roc_curve(y_test, y_prob)
plt.figure(); plt.plot(fpr, tpr, label=f'AUC = {auc(fpr,tpr):.3f}')
plt.plot([0,1],[0,1],'--'); plt.xlabel('FPR'); plt.ylabel('TPR')
plt.title('ROC Curve — Phishing Classifier'); plt.legend(); plt.savefig('roc_curve.png')
Identify which features drive phishing predictions most strongly:
# Get TF-IDF vocabulary + numeric column names
tfidf_features = tfidf.get_feature_names_out()
numeric_cols = ['has_html', 'num_links', 'has_attachment', 'subject_urgency']
all_features = list(tfidf_features) + numeric_cols
importances = clf.feature_importances_
top_idx = np.argsort(importances)[::-1][:20]
print("\nTop 20 phishing indicators:")
for i in top_idx:
print(f" {all_features[i]:<40} {importances[i]:.4f}")
# Plot top features
plt.figure(figsize=(10,6))
plt.barh([all_features[i] for i in top_idx[::-1]],
[importances[i] for i in top_idx[::-1]], color='#0e5f7a')
plt.xlabel('Feature Importance'); plt.title('Top Phishing Detection Features')
plt.tight_layout(); plt.savefig('feature_importance.png')
Create a command-line tool that scores URLs in real time:
cat > ~/phishing-lab/url_scanner.py << 'EOF'
import sys, pickle, re, numpy as np
from urllib.parse import urlparse
# Load trained URL model (train separately on UCI dataset)
# For now use heuristic scoring
def score_url(url):
parsed = urlparse(url)
domain = parsed.netloc
score = 0
reasons = []
if len(url) > 75:
score += 20; reasons.append(f"Long URL ({len(url)} chars)")
if re.match(r'\d+\.\d+\.\d+\.\d+', domain):
score += 30; reasons.append("IP address as domain")
if domain.count('.') > 3:
score += 15; reasons.append(f"Excessive subdomains ({domain.count('.')})")
if '@' in url:
score += 25; reasons.append("@ symbol in URL")
if re.search(r'login|signin|account|verify|secure|bank', url.lower()):
score += 20; reasons.append("Suspicious keywords")
if parsed.scheme != 'https':
score += 10; reasons.append("Non-HTTPS")
if '-' in domain and domain.count('-') > 2:
score += 15; reasons.append("Multiple hyphens in domain")
verdict = "PHISHING" if score >= 50 else "SUSPICIOUS" if score >= 25 else "LEGITIMATE"
return {'score': score, 'verdict': verdict, 'reasons': reasons}
if __name__ == '__main__':
url = sys.argv[1] if len(sys.argv) > 1 else input("Enter URL to scan: ")
result = score_url(url)
print(f"\nURL: {url}")
print(f"Score: {result['score']}/100")
print(f"Verdict: {result['verdict']}")
if result['reasons']:
print("Indicators:")
for r in result['reasons']: print(f" - {r}")
EOF
python3 ~/phishing-lab/url_scanner.py "http://paypa1-verify.ru/login/account?id=12345"
Fetch recent phishing URLs from PhishTank API and batch-test your scanner:
cat > ~/phishing-lab/batch_test.py << 'EOF'
import requests, json
from url_scanner import score_url
# PhishTank verified phishing URLs (free API, no key required for limited use)
resp = requests.get('http://data.phishtank.com/data/online-valid.json',
headers={'User-Agent': 'phishing-lab-research/1.0'},
timeout=30)
phishing_urls = [entry['url'] for entry in resp.json()[:50]]
results = {'phishing_detected': 0, 'missed': 0, 'scores': []}
for url in phishing_urls:
r = score_url(url)
results['scores'].append(r['score'])
if r['verdict'] in ('PHISHING', 'SUSPICIOUS'):
results['phishing_detected'] += 1
else:
results['missed'] += 1
print(f"MISSED: {url} (score={r['score']})")
print(f"\n--- Batch Results ---")
print(f"Detected: {results['phishing_detected']}/50")
print(f"Missed: {results['missed']}/50")
print(f"Avg score: {sum(results['scores'])/len(results['scores']):.1f}")
EOF
cd ~/phishing-lab && python3 batch_test.py
Convert your ML feature findings into actionable YARA rules for email gateways:
cat > ~/phishing-lab/phishing_rules.yar << 'EOF'
rule Phishing_Urgency_Language {
meta:
description = "Detects urgency language common in phishing emails"
author = "CyberSec Pro Academy - L05"
severity = "MEDIUM"
strings:
$u1 = "action required" nocase
$u2 = "verify your account" nocase
$u3 = "suspended" nocase
$u4 = "immediately" nocase
$u5 = "unusual activity" nocase
$u6 = "click here to confirm" nocase
condition:
2 of ($u*)
}
rule Phishing_Credential_Harvest {
meta:
description = "Phishing page requesting credentials"
severity = "HIGH"
strings:
$c1 = "password" nocase
$c2 = "username" nocase
$c3 = "login" nocase
$link = /https?:\/\/[^\s"']{0,20}(paypal|amazon|microsoft|apple|google)[^\s"']{0,50}\.(ru|cn|tk|ml|ga)/
condition:
$link and (2 of ($c*))
}
rule Phishing_Fake_Brand_URL {
meta:
description = "URL contains brand name in non-official domain"
strings:
$b = /https?:\/\/[a-z0-9-]{0,30}(paypal|amazon|netflix|microsoft|apple|google|facebook)[a-z0-9-]{0,30}\.(xyz|top|tk|ml|ga|cf|gq|pw)/
condition:
$b
}
EOF
# Test against spam corpus
pip install yara-python
python3 -c "
import yara, os
rules = yara.compile('phishing_rules.yar')
for f in os.listdir('data/spam_2')[:20]:
try:
matches = rules.match(f'data/spam_2/{f}')
if matches: print(f'{f}: {[m.rule for m in matches]}')
except: pass
"
Compile your findings into a detection engineering report. Ask the AI analyst for help structuring your recommendations.
| Metric | Value |
|---|---|
| Model Accuracy | |
| ROC-AUC Score | |
| False Positive Rate | |
| Top Phishing Indicator | |
| PhishTank Detection Rate | |
| YARA Rules Created |