0 / 11 steps complete
Intermediate AI-Assisted Threat Intelligence

L16: Threat Intelligence — IOC Aggregation & Enrichment

Build a Python-based Threat Intelligence Platform that aggregates IOCs from AlienVault OTX, AbuseIPDB, VirusTotal, and Shodan. Automate IOC enrichment, build a local threat feed, generate STIX bundles, and integrate threat data into your detection pipeline.

Python 3 AlienVault OTX AbuseIPDB VirusTotal STIX/TAXII Kali Linux
Phase 1: Environment & API Setup
1 Register for free threat intel API keys

Register for free API keys at these threat intelligence platforms:

PlatformURLFree Tier
AlienVault OTXotx.alienvault.comUnlimited read, 500/day write
AbuseIPDBabuseipdb.com1,000 checks/day
VirusTotalvirustotal.com4 lookups/min, 500/day
Shodanshodan.ioFree (limited)
MalwareBazaarbazaar.abuse.chUnlimited
pip3 install OTXv2 requests python-stix2 pandas mkdir ~/tip-lab && cd ~/tip-lab # Store API keys securely (never hardcode) cat > ~/.tip_keys << 'EOF' OTX_KEY=your_otx_api_key_here ABUSEIPDB_KEY=your_abuseipdb_key_here VT_KEY=your_virustotal_key_here SHODAN_KEY=your_shodan_key_here EOF chmod 600 ~/.tip_keys source ~/.tip_keys echo "API keys configured"
2 Build the core IOC data model
cat > ~/tip-lab/ioc_model.py << 'EOF' import dataclasses, datetime, json from typing import Optional, List @dataclasses.dataclass class IOC: value: str ioc_type: str # ip, domain, url, hash_md5, hash_sha256, email source: str first_seen: Optional[str] = None last_seen: Optional[str] = None confidence: int = 50 # 0-100 malware_families: List[str] = dataclasses.field(default_factory=list) tags: List[str] = dataclasses.field(default_factory=list) pulse_count: int = 0 # OTX pulses referencing this abuse_score: int = 0 # AbuseIPDB confidence score vt_positives: int = 0 # VirusTotal detection count vt_total: int = 0 # VirusTotal engine count threat_actor: Optional[str] = None mitre_techniques: List[str] = dataclasses.field(default_factory=list) enriched: bool = False created_at: str = dataclasses.field( default_factory=lambda: datetime.datetime.utcnow().isoformat()) def risk_score(self): """Composite risk score 0-100.""" score = 0 if self.abuse_score > 75: score += 35 elif self.abuse_score > 25: score += 20 if self.vt_total > 0: vt_ratio = self.vt_positives / self.vt_total score += int(vt_ratio * 40) if self.pulse_count >= 5: score += 25 elif self.pulse_count >= 2: score += 15 return min(score, 100) def to_dict(self): return dataclasses.asdict(self) @classmethod def from_dict(cls, d): return cls(**d) if __name__ == '__main__': # Test ioc = IOC(value="185.234.219.44", ioc_type="ip", source="manual", abuse_score=95, vt_positives=35, vt_total=94, pulse_count=12, tags=["c2", "ransomware"], malware_families=["BlackCat"]) print(f"IOC: {ioc.value}") print(f"Risk Score: {ioc.risk_score()}/100") print(json.dumps(ioc.to_dict(), indent=2)) EOF python3 ioc_model.py
Phase 2: Multi-Source IOC Enrichment
3 Query AlienVault OTX for IP/domain intelligence
cat > ~/tip-lab/otx_enricher.py << 'EOF' import os from OTXv2 import OTXv2, IndicatorTypes from ioc_model import IOC OTX_KEY = os.environ.get('OTX_KEY', '') otx = OTXv2(OTX_KEY) def enrich_ip_otx(ip_address): """Fetch OTX intelligence for an IP address.""" try: # General info general = otx.get_indicator_details_full(IndicatorTypes.IPv4, ip_address) base = general.get('general', {}) # Pulse (threat report) info pulse_info = base.get('pulse_info', {}) pulses = pulse_info.get('pulses', []) # Extract malware families and tags families = [] tags = [] techniques = [] for pulse in pulses: families.extend(pulse.get('malware_families', [])) tags.extend(pulse.get('tags', [])) # Some pulses include ATT&CK technique references for ref in pulse.get('references', []): if 'attack.mitre.org/techniques' in ref: tid = ref.split('/')[-1] techniques.append(tid) ioc = IOC( value=ip_address, ioc_type='ip', source='AlienVault OTX', pulse_count=pulse_info.get('count', 0), malware_families=list(set(families))[:5], tags=list(set(tags))[:10], mitre_techniques=list(set(techniques)), first_seen=base.get('first_seen'), last_seen=base.get('last_seen'), enriched=True, ) print(f"\n[OTX] {ip_address}") print(f" Pulses: {ioc.pulse_count}") print(f" Families: {ioc.malware_families}") print(f" Tags: {ioc.tags[:5]}") return ioc except Exception as e: print(f"OTX error for {ip_address}: {e}") return None if __name__ == '__main__': test_ips = ["185.234.219.44", "45.142.212.100"] for ip in test_ips: enrich_ip_otx(ip) EOF cd ~/tip-lab && python3 otx_enricher.py
4 Enrich IPs with AbuseIPDB confidence scores
cat > ~/tip-lab/abuseipdb_enricher.py << 'EOF' import os, requests ABUSEIPDB_KEY = os.environ.get('ABUSEIPDB_KEY', '') def enrich_ip_abuseipdb(ip_address, max_age_days=90): """Query AbuseIPDB for abuse reports.""" headers = {'Key': ABUSEIPDB_KEY, 'Accept': 'application/json'} params = {'ipAddress': ip_address, 'maxAgeInDays': max_age_days, 'verbose': True} resp = requests.get('https://api.abuseipdb.com/api/v2/check', headers=headers, params=params) if resp.status_code != 200: print(f"AbuseIPDB error: {resp.status_code}") return None data = resp.json().get('data', {}) reports = data.get('reports', []) result = { 'ip': ip_address, 'abuse_confidence': data.get('abuseConfidenceScore', 0), 'total_reports': data.get('totalReports', 0), 'country': data.get('countryCode', 'Unknown'), 'isp': data.get('isp', 'Unknown'), 'is_tor': data.get('isTor', False), 'is_public': data.get('isPublic', True), 'last_reported': data.get('lastReportedAt', ''), 'categories': list(set( cat for r in reports for cat in r.get('categories', []))), 'distinct_reporters': data.get('numDistinctUsers', 0), } # AbuseIPDB category codes cat_names = {18: 'Brute-Force', 14: 'Port Scan', 3: 'Fraud', 21: 'Web App Attack', 15: 'Hacking', 4: 'DDoS Attack'} print(f"\n[AbuseIPDB] {ip_address}") print(f" Confidence: {result['abuse_confidence']}%") print(f" Reports: {result['total_reports']} from {result['distinct_reporters']} users") print(f" Country/ISP: {result['country']} / {result['isp']}") print(f" Tor exit: {result['is_tor']}") print(f" Categories: {[cat_names.get(c, str(c)) for c in result['categories']]}") return result if __name__ == '__main__': ips = ["185.234.219.44", "8.8.8.8"] for ip in ips: enrich_ip_abuseipdb(ip) EOF python3 abuseipdb_enricher.py
5 Hash lookups via VirusTotal and MalwareBazaar
cat > ~/tip-lab/vt_enricher.py << 'EOF' import os, requests, time VT_KEY = os.environ.get('VT_KEY', '') def enrich_hash_vt(file_hash): """Query VirusTotal for file hash analysis.""" headers = {'x-apikey': VT_KEY} resp = requests.get( f'https://www.virustotal.com/api/v3/files/{file_hash}', headers=headers) if resp.status_code == 404: print(f"[VT] {file_hash[:16]}... — NOT FOUND") return None elif resp.status_code == 200: data = resp.json()['data']['attributes'] stats = data.get('last_analysis_stats', {}) results = data.get('last_analysis_results', {}) # Get detecting engine names detecting = [eng for eng, res in results.items() if res.get('category') == 'malicious'] result = { 'hash': file_hash, 'name': data.get('meaningful_name', 'Unknown'), 'type': data.get('type_description', 'Unknown'), 'size': data.get('size', 0), 'malicious': stats.get('malicious', 0), 'suspicious': stats.get('suspicious', 0), 'undetected': stats.get('undetected', 0), 'total_engines': sum(stats.values()), 'first_submission': data.get('first_submission_date', ''), 'popular_names': data.get('popular_threat_name', ''), 'signature': data.get('signature_info', {}).get('subject', ''), 'detecting_engines': detecting[:10], } print(f"\n[VirusTotal] {file_hash[:16]}...") print(f" Name: {result['name']}") print(f" Detections: {result['malicious']}/{result['total_engines']}") print(f" Type: {result['type']}") print(f" Top detectors: {result['detecting_engines'][:5]}") time.sleep(15) # VT rate limit: 4/min on free tier return result def enrich_hash_bazaar(file_hash): """Query MalwareBazaar (no API key needed).""" resp = requests.post('https://mb-api.abuse.ch/api/v1/', data={'query': 'get_info', 'hash': file_hash}) data = resp.json() if data.get('query_status') == 'hash_not_found': return None sample = data.get('data', [{}])[0] print(f"\n[MalwareBazaar] {file_hash[:16]}...") print(f" Family: {sample.get('signature', 'Unknown')}") print(f" First seen: {sample.get('first_seen', '')}") print(f" Tags: {sample.get('tags', [])}") return sample if __name__ == '__main__': # Known malware hashes (WannaCry, Mirai) test_hashes = [ "db349b97c37d22f5ea1d1841e3c89eb4", # WannaCry MD5 ] for h in test_hashes: enrich_hash_vt(h) enrich_hash_bazaar(h) EOF python3 vt_enricher.py
Phase 3: Automated IOC Pipeline & Local TIP
6 Build automated multi-source enrichment pipeline
cat > ~/tip-lab/enrich_iocs.py << 'EOF' import json, time, os, sys import pandas as pd from ioc_model import IOC from otx_enricher import enrich_ip_otx from abuseipdb_enricher import enrich_ip_abuseipdb from vt_enricher import enrich_hash_vt, enrich_hash_bazaar def enrich_ioc(ioc_value, ioc_type): """Enriches an IOC across all available sources.""" ioc = IOC(value=ioc_value, ioc_type=ioc_type, source='manual') if ioc_type == 'ip': # AbuseIPDB abuse = enrich_ip_abuseipdb(ioc_value) if abuse: ioc.abuse_score = abuse['abuse_confidence'] ioc.tags.extend([abuse.get('country',''), abuse.get('isp','')]) # OTX otx = enrich_ip_otx(ioc_value) if otx: ioc.pulse_count = otx.pulse_count ioc.malware_families = otx.malware_families ioc.mitre_techniques = otx.mitre_techniques time.sleep(1) # Rate limiting elif ioc_type in ('hash_md5', 'hash_sha256'): vt = enrich_hash_vt(ioc_value) if vt: ioc.vt_positives = vt['malicious'] ioc.vt_total = vt['total_engines'] baz = enrich_hash_bazaar(ioc_value) if baz: ioc.malware_families = [baz.get('signature','')] ioc.enriched = True ioc_dict = ioc.to_dict() ioc_dict['risk_score'] = ioc.risk_score() return ioc_dict # Load IOC list from incident findings ioc_list = [ {"value": "185.234.219.44", "type": "ip"}, {"value": "db349b97c37d22f5ea1d1841e3c89eb4", "type": "hash_md5"}, ] results = [] for item in ioc_list: print(f"\n--- Enriching {item['type']}: {item['value']} ---") result = enrich_ioc(item['value'], item['type']) results.append(result) # Save to local TIP database with open('tip_database.json', 'a') as f: for r in results: f.write(json.dumps(r) + '\n') # Summary report df = pd.DataFrame(results) print("\n=== ENRICHMENT SUMMARY ===") print(df[['value','ioc_type','risk_score','abuse_score','vt_positives', 'pulse_count','malware_families']].to_string()) EOF python3 enrich_iocs.py
7 Generate STIX 2.1 threat intelligence bundle
cat > ~/tip-lab/generate_stix.py << 'EOF' import json from stix2 import (Bundle, Indicator, Malware, Relationship, ThreatActor, AttackPattern, IPv4Address, DomainName) import datetime def create_stix_bundle(enriched_iocs): objects = [] # Create a threat actor (based on OTX/MITRE data) actor = ThreatActor( name="Unknown Threat Actor", description="Actor identified via IOC enrichment — ransomware campaign", threat_actor_types=["criminal"], sophistication="intermediate", resource_level="individual", primary_motivation="financial-gain", ) objects.append(actor) for ioc_data in enriched_iocs: if ioc_data['risk_score'] < 40: continue # Only export high-confidence IOCs if ioc_data['ioc_type'] == 'ip': # Create STIX Indicator for IP indicator = Indicator( name=f"Malicious IP: {ioc_data['value']}", description=f"IP with {ioc_data['abuse_score']}% AbuseIPDB confidence, " f"{ioc_data['pulse_count']} OTX pulses", pattern=f"[ipv4-addr:value = '{ioc_data['value']}']", pattern_type="stix", valid_from=datetime.datetime.utcnow().isoformat() + "Z", indicator_types=["malicious-activity"], confidence=ioc_data['risk_score'], labels=ioc_data.get('tags', []), ) objects.append(indicator) # Link to threat actor rel = Relationship( relationship_type="attributed-to", source_ref=indicator.id, target_ref=actor.id, ) objects.append(rel) elif 'hash' in ioc_data['ioc_type']: hash_type = 'MD5' if 'md5' in ioc_data['ioc_type'] else 'SHA-256' indicator = Indicator( name=f"Malicious Hash: {ioc_data['value'][:16]}...", description=f"Detected by {ioc_data['vt_positives']}/{ioc_data['vt_total']} AV engines", pattern=f"[file:hashes.'{hash_type}' = '{ioc_data['value']}']", pattern_type="stix", valid_from=datetime.datetime.utcnow().isoformat() + "Z", indicator_types=["malicious-activity"], ) objects.append(indicator) bundle = Bundle(objects=objects) with open('threat_bundle.json', 'w') as f: f.write(bundle.serialize(pretty=True)) print(f"STIX bundle created with {len(objects)} objects") return bundle # Load from TIP database enriched = [] with open('tip_database.json') as f: for line in f: enriched.append(json.loads(line)) bundle = create_stix_bundle(enriched) EOF python3 generate_stix.py
Phase 4: Threat Feed Integration & Reporting
8 Subscribe to OTX threat pulse feeds
cat > ~/tip-lab/otx_feed.py << 'EOF' import os from OTXv2 import OTXv2 import json otx = OTXv2(os.environ.get('OTX_KEY', '')) # Subscribe to relevant pulses def get_recent_pulses(days_ago=7, limit=10): """Fetch recent threat intelligence pulses.""" pulses = otx.getall(modified_since=None, limit=limit) results = [] for pulse in pulses[:limit]: iocs = [] for indicator in pulse.get('indicators', []): iocs.append({ 'value': indicator.get('indicator'), 'type': indicator.get('type'), }) results.append({ 'name': pulse.get('name'), 'description': pulse.get('description', '')[:200], 'author': pulse.get('author', {}).get('username'), 'tags': pulse.get('tags', []), 'malware_families': pulse.get('malware_families', []), 'ioc_count': len(iocs), 'iocs': iocs[:20], # First 20 IOCs 'references': pulse.get('references', []), }) print(f"\n[Pulse] {pulse.get('name')}") print(f" Author: {pulse.get('author',{}).get('username')}") print(f" IOCs: {len(iocs)}") print(f" Tags: {pulse.get('tags', [])[:5]}") with open('otx_feed.json', 'w') as f: json.dump(results, f, indent=2) return results pulses = get_recent_pulses() print(f"\nFetched {len(pulses)} pulses from OTX") EOF python3 otx_feed.py
9 Export IOC blocklist for firewall/SIEM integration
cat > ~/tip-lab/export_blocklist.py << 'EOF' import json, datetime HIGH_RISK_THRESHOLD = 70 with open('tip_database.json') as f: iocs = [json.loads(line) for line in f] # Filter high-confidence IOCs high_risk = [i for i in iocs if i.get('risk_score', 0) >= HIGH_RISK_THRESHOLD] # Export as plain IP blocklist (for firewall/iptables) with open('ip_blocklist.txt', 'w') as f: f.write(f"# CyberSec Pro Academy TIP — Generated {datetime.datetime.utcnow().isoformat()}\n") f.write(f"# Risk threshold: {HIGH_RISK_THRESHOLD}/100\n") f.write(f"# Total IOCs: {len(high_risk)}\n\n") for ioc in high_risk: if ioc['ioc_type'] == 'ip': f.write(f"{ioc['value']} # risk={ioc['risk_score']} {ioc.get('malware_families','')}\n") # Export as Snort/Suricata rules with open('emerging_threats.rules', 'w') as f: for ioc in high_risk: if ioc['ioc_type'] == 'ip': f.write(f'alert ip {ioc["value"]} any -> $HOME_NET any ' f'(msg:"CyberSec TIP - High Risk IP {ioc["value"]}"; ' f'classtype:trojan-activity; sid:{abs(hash(ioc["value"])) % 9000000 + 1000000};)\n') # Export as Splunk lookup CSV import csv with open('tip_lookup.csv', 'w', newline='') as f: writer = csv.DictWriter(f, fieldnames=['value','ioc_type','risk_score', 'malware_families','pulse_count','abuse_score']) writer.writeheader() for ioc in iocs: writer.writerow({k: str(ioc.get(k,'')) for k in writer.fieldnames}) print(f"Exported {len(high_risk)} high-risk IOCs") print("Files: ip_blocklist.txt, emerging_threats.rules, tip_lookup.csv") EOF python3 export_blocklist.py
10 Search IOCs against your Splunk SIEM

Use your TIP data to hunt for matches in historical Splunk logs:

# In Splunk (via CLI or web UI), load the TIP lookup table # First, upload tip_lookup.csv as a lookup table in Splunk # Splunk SPL — match network logs against TIP index=network OR index=proxy | eval ioc_value=src_ip | lookup tip_lookup.csv value AS ioc_value OUTPUT risk_score malware_families abuse_score | where isnotnull(risk_score) AND risk_score > 50 | table _time, src_ip, dest_ip, dest_port, risk_score, malware_families, abuse_score | sort -risk_score # Hunt for known-bad file hashes in endpoint logs index=endpoint EventCode=4688 | eval ioc_value=lower(Hashes) | lookup tip_lookup.csv value AS ioc_value OUTPUT risk_score malware_families | where isnotnull(risk_score) | table _time, ComputerName, SubjectUserName, NewProcessName, Hashes, malware_families
11 Document TIP findings and threat landscape report

Lab Findings

MetricValue
IOCs enriched
High-risk IOCs (>70)
OTX pulses matched
Malware families identified
STIX objects generated
Splunk hits in historical logs

Next: Lab L17 — Log Analysis & IOC Extraction with AI

Parse security logs at scale and extract indicators of compromise with AI assistance.

Start L17 →
AI Threat Intelligence Analyst

Enter your Anthropic API key to activate the AI analyst:

Quick Prompts: