Build a Python-based Threat Intelligence Platform that aggregates IOCs from AlienVault OTX, AbuseIPDB, VirusTotal, and Shodan. Automate IOC enrichment, build a local threat feed, generate STIX bundles, and integrate threat data into your detection pipeline.
Register for free API keys at these threat intelligence platforms:
| Platform | URL | Free Tier |
|---|---|---|
| AlienVault OTX | otx.alienvault.com | Unlimited read, 500/day write |
| AbuseIPDB | abuseipdb.com | 1,000 checks/day |
| VirusTotal | virustotal.com | 4 lookups/min, 500/day |
| Shodan | shodan.io | Free (limited) |
| MalwareBazaar | bazaar.abuse.ch | Unlimited |
pip3 install OTXv2 requests python-stix2 pandas mkdir ~/tip-lab && cd ~/tip-lab # Store API keys securely (never hardcode) cat > ~/.tip_keys << 'EOF' OTX_KEY=your_otx_api_key_here ABUSEIPDB_KEY=your_abuseipdb_key_here VT_KEY=your_virustotal_key_here SHODAN_KEY=your_shodan_key_here EOF chmod 600 ~/.tip_keys source ~/.tip_keys echo "API keys configured"
cat > ~/tip-lab/ioc_model.py << 'EOF'
import dataclasses, datetime, json
from typing import Optional, List
@dataclasses.dataclass
class IOC:
value: str
ioc_type: str # ip, domain, url, hash_md5, hash_sha256, email
source: str
first_seen: Optional[str] = None
last_seen: Optional[str] = None
confidence: int = 50 # 0-100
malware_families: List[str] = dataclasses.field(default_factory=list)
tags: List[str] = dataclasses.field(default_factory=list)
pulse_count: int = 0 # OTX pulses referencing this
abuse_score: int = 0 # AbuseIPDB confidence score
vt_positives: int = 0 # VirusTotal detection count
vt_total: int = 0 # VirusTotal engine count
threat_actor: Optional[str] = None
mitre_techniques: List[str] = dataclasses.field(default_factory=list)
enriched: bool = False
created_at: str = dataclasses.field(
default_factory=lambda: datetime.datetime.utcnow().isoformat())
def risk_score(self):
"""Composite risk score 0-100."""
score = 0
if self.abuse_score > 75: score += 35
elif self.abuse_score > 25: score += 20
if self.vt_total > 0:
vt_ratio = self.vt_positives / self.vt_total
score += int(vt_ratio * 40)
if self.pulse_count >= 5: score += 25
elif self.pulse_count >= 2: score += 15
return min(score, 100)
def to_dict(self):
return dataclasses.asdict(self)
@classmethod
def from_dict(cls, d):
return cls(**d)
if __name__ == '__main__':
# Test
ioc = IOC(value="185.234.219.44", ioc_type="ip", source="manual",
abuse_score=95, vt_positives=35, vt_total=94, pulse_count=12,
tags=["c2", "ransomware"], malware_families=["BlackCat"])
print(f"IOC: {ioc.value}")
print(f"Risk Score: {ioc.risk_score()}/100")
print(json.dumps(ioc.to_dict(), indent=2))
EOF
python3 ioc_model.py
cat > ~/tip-lab/otx_enricher.py << 'EOF'
import os
from OTXv2 import OTXv2, IndicatorTypes
from ioc_model import IOC
OTX_KEY = os.environ.get('OTX_KEY', '')
otx = OTXv2(OTX_KEY)
def enrich_ip_otx(ip_address):
"""Fetch OTX intelligence for an IP address."""
try:
# General info
general = otx.get_indicator_details_full(IndicatorTypes.IPv4, ip_address)
base = general.get('general', {})
# Pulse (threat report) info
pulse_info = base.get('pulse_info', {})
pulses = pulse_info.get('pulses', [])
# Extract malware families and tags
families = []
tags = []
techniques = []
for pulse in pulses:
families.extend(pulse.get('malware_families', []))
tags.extend(pulse.get('tags', []))
# Some pulses include ATT&CK technique references
for ref in pulse.get('references', []):
if 'attack.mitre.org/techniques' in ref:
tid = ref.split('/')[-1]
techniques.append(tid)
ioc = IOC(
value=ip_address,
ioc_type='ip',
source='AlienVault OTX',
pulse_count=pulse_info.get('count', 0),
malware_families=list(set(families))[:5],
tags=list(set(tags))[:10],
mitre_techniques=list(set(techniques)),
first_seen=base.get('first_seen'),
last_seen=base.get('last_seen'),
enriched=True,
)
print(f"\n[OTX] {ip_address}")
print(f" Pulses: {ioc.pulse_count}")
print(f" Families: {ioc.malware_families}")
print(f" Tags: {ioc.tags[:5]}")
return ioc
except Exception as e:
print(f"OTX error for {ip_address}: {e}")
return None
if __name__ == '__main__':
test_ips = ["185.234.219.44", "45.142.212.100"]
for ip in test_ips:
enrich_ip_otx(ip)
EOF
cd ~/tip-lab && python3 otx_enricher.py
cat > ~/tip-lab/abuseipdb_enricher.py << 'EOF'
import os, requests
ABUSEIPDB_KEY = os.environ.get('ABUSEIPDB_KEY', '')
def enrich_ip_abuseipdb(ip_address, max_age_days=90):
"""Query AbuseIPDB for abuse reports."""
headers = {'Key': ABUSEIPDB_KEY, 'Accept': 'application/json'}
params = {'ipAddress': ip_address, 'maxAgeInDays': max_age_days, 'verbose': True}
resp = requests.get('https://api.abuseipdb.com/api/v2/check',
headers=headers, params=params)
if resp.status_code != 200:
print(f"AbuseIPDB error: {resp.status_code}")
return None
data = resp.json().get('data', {})
reports = data.get('reports', [])
result = {
'ip': ip_address,
'abuse_confidence': data.get('abuseConfidenceScore', 0),
'total_reports': data.get('totalReports', 0),
'country': data.get('countryCode', 'Unknown'),
'isp': data.get('isp', 'Unknown'),
'is_tor': data.get('isTor', False),
'is_public': data.get('isPublic', True),
'last_reported': data.get('lastReportedAt', ''),
'categories': list(set(
cat for r in reports for cat in r.get('categories', []))),
'distinct_reporters': data.get('numDistinctUsers', 0),
}
# AbuseIPDB category codes
cat_names = {18: 'Brute-Force', 14: 'Port Scan', 3: 'Fraud',
21: 'Web App Attack', 15: 'Hacking', 4: 'DDoS Attack'}
print(f"\n[AbuseIPDB] {ip_address}")
print(f" Confidence: {result['abuse_confidence']}%")
print(f" Reports: {result['total_reports']} from {result['distinct_reporters']} users")
print(f" Country/ISP: {result['country']} / {result['isp']}")
print(f" Tor exit: {result['is_tor']}")
print(f" Categories: {[cat_names.get(c, str(c)) for c in result['categories']]}")
return result
if __name__ == '__main__':
ips = ["185.234.219.44", "8.8.8.8"]
for ip in ips:
enrich_ip_abuseipdb(ip)
EOF
python3 abuseipdb_enricher.py
cat > ~/tip-lab/vt_enricher.py << 'EOF'
import os, requests, time
VT_KEY = os.environ.get('VT_KEY', '')
def enrich_hash_vt(file_hash):
"""Query VirusTotal for file hash analysis."""
headers = {'x-apikey': VT_KEY}
resp = requests.get(
f'https://www.virustotal.com/api/v3/files/{file_hash}',
headers=headers)
if resp.status_code == 404:
print(f"[VT] {file_hash[:16]}... — NOT FOUND")
return None
elif resp.status_code == 200:
data = resp.json()['data']['attributes']
stats = data.get('last_analysis_stats', {})
results = data.get('last_analysis_results', {})
# Get detecting engine names
detecting = [eng for eng, res in results.items()
if res.get('category') == 'malicious']
result = {
'hash': file_hash,
'name': data.get('meaningful_name', 'Unknown'),
'type': data.get('type_description', 'Unknown'),
'size': data.get('size', 0),
'malicious': stats.get('malicious', 0),
'suspicious': stats.get('suspicious', 0),
'undetected': stats.get('undetected', 0),
'total_engines': sum(stats.values()),
'first_submission': data.get('first_submission_date', ''),
'popular_names': data.get('popular_threat_name', ''),
'signature': data.get('signature_info', {}).get('subject', ''),
'detecting_engines': detecting[:10],
}
print(f"\n[VirusTotal] {file_hash[:16]}...")
print(f" Name: {result['name']}")
print(f" Detections: {result['malicious']}/{result['total_engines']}")
print(f" Type: {result['type']}")
print(f" Top detectors: {result['detecting_engines'][:5]}")
time.sleep(15) # VT rate limit: 4/min on free tier
return result
def enrich_hash_bazaar(file_hash):
"""Query MalwareBazaar (no API key needed)."""
resp = requests.post('https://mb-api.abuse.ch/api/v1/',
data={'query': 'get_info', 'hash': file_hash})
data = resp.json()
if data.get('query_status') == 'hash_not_found':
return None
sample = data.get('data', [{}])[0]
print(f"\n[MalwareBazaar] {file_hash[:16]}...")
print(f" Family: {sample.get('signature', 'Unknown')}")
print(f" First seen: {sample.get('first_seen', '')}")
print(f" Tags: {sample.get('tags', [])}")
return sample
if __name__ == '__main__':
# Known malware hashes (WannaCry, Mirai)
test_hashes = [
"db349b97c37d22f5ea1d1841e3c89eb4", # WannaCry MD5
]
for h in test_hashes:
enrich_hash_vt(h)
enrich_hash_bazaar(h)
EOF
python3 vt_enricher.py
cat > ~/tip-lab/enrich_iocs.py << 'EOF'
import json, time, os, sys
import pandas as pd
from ioc_model import IOC
from otx_enricher import enrich_ip_otx
from abuseipdb_enricher import enrich_ip_abuseipdb
from vt_enricher import enrich_hash_vt, enrich_hash_bazaar
def enrich_ioc(ioc_value, ioc_type):
"""Enriches an IOC across all available sources."""
ioc = IOC(value=ioc_value, ioc_type=ioc_type, source='manual')
if ioc_type == 'ip':
# AbuseIPDB
abuse = enrich_ip_abuseipdb(ioc_value)
if abuse:
ioc.abuse_score = abuse['abuse_confidence']
ioc.tags.extend([abuse.get('country',''), abuse.get('isp','')])
# OTX
otx = enrich_ip_otx(ioc_value)
if otx:
ioc.pulse_count = otx.pulse_count
ioc.malware_families = otx.malware_families
ioc.mitre_techniques = otx.mitre_techniques
time.sleep(1) # Rate limiting
elif ioc_type in ('hash_md5', 'hash_sha256'):
vt = enrich_hash_vt(ioc_value)
if vt:
ioc.vt_positives = vt['malicious']
ioc.vt_total = vt['total_engines']
baz = enrich_hash_bazaar(ioc_value)
if baz:
ioc.malware_families = [baz.get('signature','')]
ioc.enriched = True
ioc_dict = ioc.to_dict()
ioc_dict['risk_score'] = ioc.risk_score()
return ioc_dict
# Load IOC list from incident findings
ioc_list = [
{"value": "185.234.219.44", "type": "ip"},
{"value": "db349b97c37d22f5ea1d1841e3c89eb4", "type": "hash_md5"},
]
results = []
for item in ioc_list:
print(f"\n--- Enriching {item['type']}: {item['value']} ---")
result = enrich_ioc(item['value'], item['type'])
results.append(result)
# Save to local TIP database
with open('tip_database.json', 'a') as f:
for r in results:
f.write(json.dumps(r) + '\n')
# Summary report
df = pd.DataFrame(results)
print("\n=== ENRICHMENT SUMMARY ===")
print(df[['value','ioc_type','risk_score','abuse_score','vt_positives',
'pulse_count','malware_families']].to_string())
EOF
python3 enrich_iocs.py
cat > ~/tip-lab/generate_stix.py << 'EOF'
import json
from stix2 import (Bundle, Indicator, Malware, Relationship,
ThreatActor, AttackPattern, IPv4Address, DomainName)
import datetime
def create_stix_bundle(enriched_iocs):
objects = []
# Create a threat actor (based on OTX/MITRE data)
actor = ThreatActor(
name="Unknown Threat Actor",
description="Actor identified via IOC enrichment — ransomware campaign",
threat_actor_types=["criminal"],
sophistication="intermediate",
resource_level="individual",
primary_motivation="financial-gain",
)
objects.append(actor)
for ioc_data in enriched_iocs:
if ioc_data['risk_score'] < 40:
continue # Only export high-confidence IOCs
if ioc_data['ioc_type'] == 'ip':
# Create STIX Indicator for IP
indicator = Indicator(
name=f"Malicious IP: {ioc_data['value']}",
description=f"IP with {ioc_data['abuse_score']}% AbuseIPDB confidence, "
f"{ioc_data['pulse_count']} OTX pulses",
pattern=f"[ipv4-addr:value = '{ioc_data['value']}']",
pattern_type="stix",
valid_from=datetime.datetime.utcnow().isoformat() + "Z",
indicator_types=["malicious-activity"],
confidence=ioc_data['risk_score'],
labels=ioc_data.get('tags', []),
)
objects.append(indicator)
# Link to threat actor
rel = Relationship(
relationship_type="attributed-to",
source_ref=indicator.id,
target_ref=actor.id,
)
objects.append(rel)
elif 'hash' in ioc_data['ioc_type']:
hash_type = 'MD5' if 'md5' in ioc_data['ioc_type'] else 'SHA-256'
indicator = Indicator(
name=f"Malicious Hash: {ioc_data['value'][:16]}...",
description=f"Detected by {ioc_data['vt_positives']}/{ioc_data['vt_total']} AV engines",
pattern=f"[file:hashes.'{hash_type}' = '{ioc_data['value']}']",
pattern_type="stix",
valid_from=datetime.datetime.utcnow().isoformat() + "Z",
indicator_types=["malicious-activity"],
)
objects.append(indicator)
bundle = Bundle(objects=objects)
with open('threat_bundle.json', 'w') as f:
f.write(bundle.serialize(pretty=True))
print(f"STIX bundle created with {len(objects)} objects")
return bundle
# Load from TIP database
enriched = []
with open('tip_database.json') as f:
for line in f:
enriched.append(json.loads(line))
bundle = create_stix_bundle(enriched)
EOF
python3 generate_stix.py
cat > ~/tip-lab/otx_feed.py << 'EOF'
import os
from OTXv2 import OTXv2
import json
otx = OTXv2(os.environ.get('OTX_KEY', ''))
# Subscribe to relevant pulses
def get_recent_pulses(days_ago=7, limit=10):
"""Fetch recent threat intelligence pulses."""
pulses = otx.getall(modified_since=None, limit=limit)
results = []
for pulse in pulses[:limit]:
iocs = []
for indicator in pulse.get('indicators', []):
iocs.append({
'value': indicator.get('indicator'),
'type': indicator.get('type'),
})
results.append({
'name': pulse.get('name'),
'description': pulse.get('description', '')[:200],
'author': pulse.get('author', {}).get('username'),
'tags': pulse.get('tags', []),
'malware_families': pulse.get('malware_families', []),
'ioc_count': len(iocs),
'iocs': iocs[:20], # First 20 IOCs
'references': pulse.get('references', []),
})
print(f"\n[Pulse] {pulse.get('name')}")
print(f" Author: {pulse.get('author',{}).get('username')}")
print(f" IOCs: {len(iocs)}")
print(f" Tags: {pulse.get('tags', [])[:5]}")
with open('otx_feed.json', 'w') as f:
json.dump(results, f, indent=2)
return results
pulses = get_recent_pulses()
print(f"\nFetched {len(pulses)} pulses from OTX")
EOF
python3 otx_feed.py
cat > ~/tip-lab/export_blocklist.py << 'EOF'
import json, datetime
HIGH_RISK_THRESHOLD = 70
with open('tip_database.json') as f:
iocs = [json.loads(line) for line in f]
# Filter high-confidence IOCs
high_risk = [i for i in iocs if i.get('risk_score', 0) >= HIGH_RISK_THRESHOLD]
# Export as plain IP blocklist (for firewall/iptables)
with open('ip_blocklist.txt', 'w') as f:
f.write(f"# CyberSec Pro Academy TIP — Generated {datetime.datetime.utcnow().isoformat()}\n")
f.write(f"# Risk threshold: {HIGH_RISK_THRESHOLD}/100\n")
f.write(f"# Total IOCs: {len(high_risk)}\n\n")
for ioc in high_risk:
if ioc['ioc_type'] == 'ip':
f.write(f"{ioc['value']} # risk={ioc['risk_score']} {ioc.get('malware_families','')}\n")
# Export as Snort/Suricata rules
with open('emerging_threats.rules', 'w') as f:
for ioc in high_risk:
if ioc['ioc_type'] == 'ip':
f.write(f'alert ip {ioc["value"]} any -> $HOME_NET any '
f'(msg:"CyberSec TIP - High Risk IP {ioc["value"]}"; '
f'classtype:trojan-activity; sid:{abs(hash(ioc["value"])) % 9000000 + 1000000};)\n')
# Export as Splunk lookup CSV
import csv
with open('tip_lookup.csv', 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=['value','ioc_type','risk_score',
'malware_families','pulse_count','abuse_score'])
writer.writeheader()
for ioc in iocs:
writer.writerow({k: str(ioc.get(k,'')) for k in writer.fieldnames})
print(f"Exported {len(high_risk)} high-risk IOCs")
print("Files: ip_blocklist.txt, emerging_threats.rules, tip_lookup.csv")
EOF
python3 export_blocklist.py
Use your TIP data to hunt for matches in historical Splunk logs:
# In Splunk (via CLI or web UI), load the TIP lookup table # First, upload tip_lookup.csv as a lookup table in Splunk # Splunk SPL — match network logs against TIP index=network OR index=proxy | eval ioc_value=src_ip | lookup tip_lookup.csv value AS ioc_value OUTPUT risk_score malware_families abuse_score | where isnotnull(risk_score) AND risk_score > 50 | table _time, src_ip, dest_ip, dest_port, risk_score, malware_families, abuse_score | sort -risk_score # Hunt for known-bad file hashes in endpoint logs index=endpoint EventCode=4688 | eval ioc_value=lower(Hashes) | lookup tip_lookup.csv value AS ioc_value OUTPUT risk_score malware_families | where isnotnull(risk_score) | table _time, ComputerName, SubjectUserName, NewProcessName, Hashes, malware_families
| Metric | Value |
|---|---|
| IOCs enriched | |
| High-risk IOCs (>70) | |
| OTX pulses matched | |
| Malware families identified | |
| STIX objects generated | |
| Splunk hits in historical logs |