Parse Windows EVTX, Linux syslog, Apache access logs, and firewall logs using Python on SIFT Workstation. Automate IOC extraction with regex and NLP, correlate events across log sources, and feed results into your Splunk detection pipeline.
pip3 install python-evtx pandas matplotlib ipaddress
mkdir ~/log-lab && cd ~/log-lab
mkdir logs iocs reports
# Generate synthetic log data for practice
cat > generate_logs.py << 'EOF'
import random, datetime, ipaddress
# --- Windows Security EVTX (simulated CSV) ---
users = ['alice','bob','svc_backup','SYSTEM','admin']
events = []
base = datetime.datetime(2024, 3, 1, 8, 0, 0)
for i in range(2000):
t = base + datetime.timedelta(seconds=random.randint(0, 86400))
user = random.choice(users)
eid = random.choices([4624,4625,4672,4688,4648,4698,7045,4104],
weights=[50,10,5,20,3,2,1,1])[0]
events.append(f"{t.isoformat()}|{eid}|{user}|WS{random.randint(1,20):02d}|"
f"192.168.56.{random.randint(10,50)}\n")
# Inject malicious events
for i in range(30):
t = datetime.datetime(2024, 3, 1, 22, random.randint(0,59), random.randint(0,59))
events.append(f"{t.isoformat()}|4688|admin|WS01|192.168.56.10|"
f"cmd.exe /c powershell -enc [BASE64PAYLOAD]\n")
events.append(f"{t.isoformat()}|4698|admin|WS01|192.168.56.10|"
f"New scheduled task: WindowsUpdate\n")
with open('logs/windows_events.txt', 'w') as f:
f.writelines(random.sample(events, len(events)))
# --- Apache access log ---
attacks = [
'GET /wp-admin/admin-ajax.php?action=revslider_show_image&img=../wp-config.php',
'GET /.env HTTP/1.1',
'GET /etc/passwd HTTP/1.1',
'POST /index.php?s=/Index/\\think\\app/invokefunction&function=call_user_func_array',
]
with open('logs/apache_access.log', 'w') as f:
for i in range(500):
t = base + datetime.timedelta(seconds=random.randint(0,86400))
ip = f"192.168.1.{random.randint(1,254)}"
status = random.choices([200,304,404,500],[60,20,15,5])[0]
f.write(f'{ip} - - [{t.strftime("%d/%b/%Y:%H:%M:%S +0000")}] '
f'"GET /page{random.randint(1,20)}.html HTTP/1.1" {status} {random.randint(500,50000)}\n')
# Inject attacks
attack_ip = "185.234.219.44"
for attack in attacks:
f.write(f'{attack_ip} - - [{base.strftime("%d/%b/%Y:%H:%M:%S +0000")}] '
f'"{attack}" 200 512\n')
print("Synthetic logs generated in logs/")
EOF
python3 generate_logs.py
cat > ~/log-lab/parse_evtx.py << 'EOF'
import Evtx.Evtx as evtx
import Evtx.Views as e_views
from lxml import etree
import pandas as pd, json, glob
def parse_evtx_file(evtx_path):
records = []
with evtx.Evtx(evtx_path) as log:
for record in log.records():
try:
xml = etree.fromstring(record.xml())
ns = {'ns': 'http://schemas.microsoft.com/win/2004/08/events/event'}
system = xml.find('ns:System', ns)
eid = system.findtext('ns:EventID', namespaces=ns)
time_created = system.find('ns:TimeCreated', ns)
time = time_created.get('SystemTime') if time_created is not None else ''
computer = system.findtext('ns:Computer', namespaces=ns, default='')
event_data = xml.find('ns:EventData', ns)
data = {}
if event_data is not None:
for item in event_data:
name = item.get('Name', f'Data_{len(data)}')
data[name] = item.text or ''
records.append({
'time': time,
'event_id': eid,
'computer': computer,
'subject_user': data.get('SubjectUserName', ''),
'target_user': data.get('TargetUserName', ''),
'logon_type': data.get('LogonType', ''),
'process_name': data.get('NewProcessName', ''),
'command_line': data.get('CommandLine', ''),
'source_ip': data.get('IpAddress', data.get('WorkstationName', '')),
'raw_data': json.dumps(data),
})
except Exception:
continue
return pd.DataFrame(records)
# Process all EVTX files
evtx_files = glob.glob('logs/*.evtx')
if not evtx_files:
print("No EVTX files found. Using synthetic log data instead.")
# Parse the synthetic text-based log
import io
df = pd.read_csv('logs/windows_events.txt', sep='|',
names=['time','event_id','user','workstation','source_ip'],
on_bad_lines='skip')
else:
dfs = [parse_evtx_file(f) for f in evtx_files]
df = pd.concat(dfs, ignore_index=True)
df['time'] = pd.to_datetime(df.get('time', df.get('time')), errors='coerce')
print(f"Parsed {len(df)} events")
print(df.groupby('event_id').size().sort_values(ascending=False).head())
df.to_csv('logs/parsed_events.csv', index=False)
EOF
python3 parse_evtx.py
cat > ~/log-lab/ioc_extractor.py << 'EOF'
import re, json
from collections import defaultdict
# IOC regex patterns
PATTERNS = {
'ipv4': re.compile(
r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}'
r'(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b'),
'ipv6': re.compile(
r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b'),
'domain': re.compile(
r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+'
r'(?:com|net|org|io|ru|cn|tk|ml|info|biz|xyz|top|pw|ga|cf|gq)\b'),
'url': re.compile(
r'https?://[^\s"\'<>\]\)]+'),
'md5': re.compile(r'\b[a-fA-F0-9]{32}\b'),
'sha256': re.compile(r'\b[a-fA-F0-9]{64}\b'),
'email': re.compile(
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
'base64_cmd': re.compile(
r'-enc(?:odedcommand)?\s+([A-Za-z0-9+/]{20,}={0,2})', re.IGNORECASE),
'cve': re.compile(r'CVE-\d{4}-\d{4,7}', re.IGNORECASE),
'mitre': re.compile(r'T\d{4}(?:\.\d{3})?'),
}
# Private IP ranges to filter
import ipaddress
PRIVATE_RANGES = [
ipaddress.ip_network('10.0.0.0/8'),
ipaddress.ip_network('172.16.0.0/12'),
ipaddress.ip_network('192.168.0.0/16'),
ipaddress.ip_network('127.0.0.0/8'),
]
def is_private(ip):
try:
addr = ipaddress.ip_address(ip)
return any(addr in net for net in PRIVATE_RANGES)
except: return False
def extract_iocs(text):
found = defaultdict(set)
for ioc_type, pattern in PATTERNS.items():
for match in pattern.finditer(text):
value = match.group(0)
if ioc_type == 'ipv4' and is_private(value):
continue # Filter private IPs
if ioc_type == 'domain' and '.' not in value:
continue
found[ioc_type].add(value)
return {k: list(v) for k, v in found.items() if v}
# Process log files
import glob, os
all_iocs = defaultdict(set)
for log_file in glob.glob('logs/*.log') + glob.glob('logs/*.txt') + glob.glob('logs/*.csv'):
with open(log_file, errors='ignore') as f:
content = f.read()
iocs = extract_iocs(content)
print(f"\n{os.path.basename(log_file)}:")
for ioc_type, values in iocs.items():
print(f" {ioc_type}: {len(values)} found")
for v in list(values)[:3]:
print(f" {v}")
all_iocs[ioc_type].update(values)
# Save results
with open('iocs/extracted_iocs.json', 'w') as f:
json.dump({k: list(v) for k, v in all_iocs.items()}, f, indent=2)
print(f"\nTotal unique IOCs extracted:")
for t, v in all_iocs.items():
print(f" {t}: {len(v)}")
EOF
python3 ioc_extractor.py
cat > ~/log-lab/decode_powershell.py << 'EOF'
import base64, re, json
def decode_ps_encoded(encoded_str):
"""Decode PowerShell -EncodedCommand (UTF-16LE base64)."""
# Add padding if needed
padded = encoded_str + '=' * (4 - len(encoded_str) % 4)
try:
decoded = base64.b64decode(padded).decode('utf-16-le')
return decoded
except Exception as e:
# Try standard base64
try:
return base64.b64decode(padded).decode('utf-8', errors='replace')
except:
return f"[Decode error: {e}]"
def analyze_powershell_line(line):
"""Analyze a PowerShell command for suspicious patterns."""
line_lower = line.lower()
indicators = []
# Suspicious patterns
if re.search(r'-enc(?:odedcommand)?', line_lower):
indicators.append("EncodedCommand — obfuscation technique")
if 'iex' in line_lower or 'invoke-expression' in line_lower:
indicators.append("IEX/Invoke-Expression — code injection")
if 'downloadstring' in line_lower or 'downloadfile' in line_lower:
indicators.append("Download from internet")
if 'net.webclient' in line_lower or 'invoke-webrequest' in line_lower:
indicators.append("Network download client")
if 'bypass' in line_lower:
indicators.append("ExecutionPolicy Bypass")
if 'hidden' in line_lower:
indicators.append("Hidden window")
if 'mimikatz' in line_lower:
indicators.append("Mimikatz reference")
if re.search(r'\$env:temp', line_lower):
indicators.append("Writing to TEMP directory")
# Extract and decode encoded segments
enc_matches = re.findall(r'-enc(?:odedcommand)?\s+([A-Za-z0-9+/]{20,}={0,2})',
line, re.IGNORECASE)
decoded_cmds = []
for match in enc_matches:
decoded = decode_ps_encoded(match)
decoded_cmds.append(decoded)
indicators.extend(analyze_powershell_line(decoded)) # Recursive!
return indicators, decoded_cmds
# Read logs and find PowerShell
with open('logs/windows_events.txt', errors='ignore') as f:
content = f.read()
ps_lines = [line for line in content.split('\n') if 'powershell' in line.lower()]
print(f"Found {len(ps_lines)} PowerShell-related log entries\n")
for line in ps_lines[:10]:
indicators, decoded = analyze_powershell_line(line)
if indicators:
print(f"[!] Suspicious PowerShell:")
print(f" {line[:120]}")
for ind in indicators:
print(f" → {ind}")
for d in decoded:
print(f" DECODED: {d[:200]}")
print()
EOF
python3 decode_powershell.py
cat > ~/log-lab/apache_analyzer.py << 'EOF'
import re, pandas as pd
from collections import Counter
APACHE_PATTERN = re.compile(
r'(?P\d+\.\d+\.\d+\.\d+) .+ \[(?P