Security-Modul entwickeln: Der Schutzschild
Sicherheit ist kein Feature, sondern eine Grundvoraussetzung. In diesem Kapitel bauen wir das Security-Modul, das alle Eingaben validiert und böse Anfragen blockiert.
Das Security-Modul erstellen
Wir beginnen mit der Grundstruktur unseres Security-Validators:
# modules/security.py
import re
import json
import os
from typing import List, Tuple, Dict, Any, Optional
class SecurityValidator:
"""Zentrale Sicherheitsvalidierung für alle DB-Operationen"""
def __init__(self, config_dir: str):
self.config_dir = config_dir
self.security_config = self._load_security_config()
self.blocked_patterns = self._compile_blocked_patterns()
def _load_security_config(self) -> Dict:
"""Lädt die Sicherheitskonfiguration"""
config_path = os.path.join(self.config_dir, 'security.json')
with open(config_path, 'r') as f:
return json.load(f)
def _compile_blocked_patterns(self) -> List:
"""Kompiliert gefährliche SQL-Muster für schnelle Prüfung"""
patterns = []
for keyword in self.security_config['blocked_keywords']:
# Case-insensitive Muster mit Word-Boundaries
pattern = re.compile(
r'\b' + re.escape(keyword) + r'\b',
re.IGNORECASE
)
patterns.append(pattern)
return patterns
Query-Validierung
Der wichtigste Teil: Prüfen, ob eine Query sicher ist:
def validate_query(self, query: str, params: Optional[List] = None) -> Tuple[bool, str]:
"""Validiert eine SQL-Query auf Sicherheitsprobleme"""
# Leere Query ablehnen
if not query or not query.strip():
return False, "Query darf nicht leer sein"
# Zu lange Queries ablehnen
if len(query) > 10000:
return False, "Query zu lang (max 10000 Zeichen)"
# Geblockte Keywords prüfen
for pattern in self.blocked_patterns:
if pattern.search(query):
return False, f"Verbotenes SQL-Keyword gefunden"
# Multiple Statements verhindern
if ';' in query.strip()[:-1]: # ; am Ende ist OK
return False, "Multiple Statements nicht erlaubt"
# Parameter-Count prüfen
if params:
max_params = self.security_config['parameter_binding']['max_parameters']
if len(params) > max_params:
return False, f"Zu viele Parameter (max {max_params})"
# Kommentare verhindern (können SQL-Injection verstecken)
if '--' in query or '/*' in query:
return False, "SQL-Kommentare nicht erlaubt"
return True, "Query ist sicher"
Die Regex-Patterns mit Word-Boundaries (\b) verhindern, dass normale Wörter wie "dropdown" fälschlicherweise als "DROP" erkannt werden.
Tabellen- und Spalten-Validierung
Wir prüfen, ob nur erlaubte Tabellen und Spalten verwendet werden:
def validate_table_name(self, table: str) -> bool:
"""Prüft ob Tabellenname erlaubt ist"""
allowed_tables = self.security_config.get('allowed_tables', [])
# Nur alphanumerisch und Underscore
if not re.match(r'^[a-zA-Z0-9_]+$', table):
return False
# Muss in Whitelist sein
return table in allowed_tables
def validate_column_names(self, columns: List[str]) -> bool:
"""Prüft ob alle Spaltennamen erlaubt sind"""
for column in columns:
# Nur alphanumerisch und Underscore
if not re.match(r'^[a-zA-Z0-9_]+$', column):
return False
# Prüfen ob in erlaubten Spalten
for table, allowed_cols in self.security_config.get('allowed_columns', {}).items():
if column not in allowed_cols:
return False
return True
WHERE-Klausel Validierung
WHERE-Klauseln sind besonders anfällig für Injection. Hier unsere Prüfung:
def validate_where_clause(self, where_clause: str) -> Tuple[bool, str]:
"""Validiert WHERE-Klauseln auf gefährliche Muster"""
if not where_clause:
return False, "WHERE-Klausel darf nicht leer sein"
# Gefährliche Funktionen blockieren
dangerous_functions = [
'SLEEP', 'BENCHMARK', 'LOAD_FILE',
'INTO OUTFILE', 'INTO DUMPFILE'
]
where_upper = where_clause.upper()
for func in dangerous_functions:
if func in where_upper:
return False, f"Gefährliche Funktion '{func}' nicht erlaubt"
# Verschachtelte SELECTs verhindern (Subqueries)
if 'SELECT' in where_upper:
return False, "Subqueries in WHERE nicht erlaubt"
# OR 1=1 und ähnliche Tricks erkennen
suspicious_patterns = [
r'OR\s+1\s*=\s*1',
r'OR\s+["\']1["\']\s*=\s*["\']1["\']',
r'OR\s+true',
r'AND\s+1\s*=\s*0',
r'AND\s+false'
]
for pattern in suspicious_patterns:
if re.search(pattern, where_clause, re.IGNORECASE):
return False, "Verdächtiges WHERE-Muster erkannt"
return True, "WHERE-Klausel ist sicher"
Input-Sanitization
Für Strings und andere Eingaben brauchen wir Sanitization:
def sanitize_string(self, value: str, max_length: int = 1000) -> str:
"""Bereinigt String-Eingaben"""
if not value:
return ""
# Länge begrenzen
value = value[:max_length]
# Control-Characters entfernen (außer Newline und Tab)
value = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', value)
# Führende/nachfolgende Whitespaces entfernen
value = value.strip()
return value
def sanitize_identifier(self, identifier: str) -> Optional[str]:
"""Bereinigt Datenbank-Identifier (Tabellen, Spalten)"""
if not identifier:
return None
# Nur alphanumerisch und Underscore
if not re.match(r'^[a-zA-Z][a-zA-Z0-9_]*$', identifier):
return None
# Maximale Länge (MySQL: 64 Zeichen)
if len(identifier) > 64:
return None
return identifier
Rate-Limiting Implementierung
Zu viele Anfragen zu schnell? Das blockieren wir:
def __init__(self, config_dir: str):
# ... vorheriger Code ...
self.query_timestamps = {} # User -> Liste von Timestamps
self.rate_limits = self.security_config.get('rate_limiting', {})
def check_rate_limit(self, user: str) -> Tuple[bool, str]:
"""Prüft ob User sein Rate-Limit überschritten hat"""
import time
current_time = time.time()
queries_per_minute = self.rate_limits.get('queries_per_minute', 60)
# User-Timestamps holen oder initialisieren
if user not in self.query_timestamps:
self.query_timestamps[user] = []
# Alte Timestamps entfernen (älter als 1 Minute)
self.query_timestamps[user] = [
ts for ts in self.query_timestamps[user]
if current_time - ts < 60
]
# Prüfen ob Limit erreicht
if len(self.query_timestamps[user]) >= queries_per_minute:
return False, f"Rate-Limit erreicht ({queries_per_minute} Queries/Minute)"
# Neuen Timestamp hinzufügen
self.query_timestamps[user].append(current_time)
return True, "Rate-Limit OK"
Rate-Limiting verhindert nicht nur DoS-Angriffe, sondern auch versehentliche Endlosschleifen in Client-Code.
Berechtigungs-Prüfung
Nicht jeder darf alles. Hier prüfen wir die Berechtigungen:
def check_operation_permission(self, user_type: str, operation: str, database: str) -> bool:
"""Prüft ob User-Typ die Operation ausführen darf"""
# User-Permissions aus Config laden
with open(os.path.join(self.config_dir, 'credentials.json'), 'r') as f:
cred_config = json.load(f)
# Default User-Type wenn nicht angegeben
if not user_type:
user_type = cred_config.get('default_user', 'content_reader')
# User-Config holen
user_config = cred_config.get('database_users', {}).get(user_type)
if not user_config:
return False
# Datenbank prüfen
allowed_dbs = user_config.get('databases', [])
if database not in allowed_dbs:
return False
# Operation prüfen
allowed_ops = user_config.get('permissions', [])
operation_map = {
'SELECT': 'SELECT',
'LIST_TABLES': 'SELECT',
'DESCRIBE_TABLE': 'SELECT',
'INSERT': 'INSERT',
'UPDATE': 'UPDATE',
'DELETE': 'DELETE'
}
required_permission = operation_map.get(operation, operation)
return required_permission in allowed_ops
Das komplette Security-Modul
Hier nochmal die wichtigsten Methoden zusammengefasst:
# modules/security.py - Zusammenfassung
class SecurityValidator:
def __init__(self, config_dir: str):
"""Initialisiert Security-Validator mit Config"""
def validate_query(self, query: str, params: Optional[List]) -> Tuple[bool, str]:
"""Hauptvalidierung für SQL-Queries"""
def validate_table_name(self, table: str) -> bool:
"""Prüft Tabellennamen"""
def validate_column_names(self, columns: List[str]) -> bool:
"""Prüft Spaltennamen"""
def validate_where_clause(self, where: str) -> Tuple[bool, str]:
"""Prüft WHERE-Klauseln"""
def sanitize_string(self, value: str, max_length: int) -> str:
"""Bereinigt String-Eingaben"""
def check_rate_limit(self, user: str) -> Tuple[bool, str]:
"""Rate-Limiting pro User"""
def check_operation_permission(self, user: str, op: str, db: str) -> bool:
"""Prüft Berechtigungen"""
Security-Tests schreiben
Vertrauen ist gut, Tests sind besser:
# test_security.py
#!/usr/bin/env python3
import sys
sys.path.append('.')
from modules.security import SecurityValidator
def test_sql_injection():
"""Testet SQL-Injection Erkennung"""
validator = SecurityValidator('./config')
# Diese sollten geblockt werden
bad_queries = [
"SELECT * FROM users; DROP TABLE users",
"SELECT * FROM users WHERE id = 1 OR 1=1",
"SELECT * FROM users WHERE name = '' OR '1'='1'",
"SELECT * FROM users -- comment",
"SELECT * FROM users WHERE id = 1 UNION SELECT * FROM passwords"
]
for query in bad_queries:
valid, msg = validator.validate_query(query)
assert not valid, f"Sollte geblockt werden: {query}"
print(f"BLOCKED: {query[:50]}... - {msg}")
# Diese sollten durchkommen
good_queries = [
"SELECT * FROM content_metadata WHERE id = ?",
"UPDATE content_metadata SET title = ? WHERE id = ?",
"DELETE FROM content_metadata WHERE status = ?"
]
for query in good_queries:
valid, msg = validator.validate_query(query, [1, 2])
assert valid, f"Sollte erlaubt sein: {query}"
print(f"ALLOWED: {query[:50]}...")
print("\nAlle Security-Tests bestanden!")
if __name__ == "__main__":
test_sql_injection()
Führe diese Tests regelmäßig aus. Besonders nach Änderungen am Security-Modul.
Logging von Security-Events
Jeder geblockte Angriff sollte geloggt werden:
def log_security_violation(self, violation_type: str, details: Dict[str, Any]):
"""Loggt Sicherheitsverletzungen für spätere Analyse"""
import logging
import json
from datetime import datetime
logger = logging.getLogger('security')
log_entry = {
'timestamp': datetime.now().isoformat(),
'type': violation_type,
'details': details
}
# In Security-Log schreiben
logger.warning(f"SECURITY VIOLATION: {json.dumps(log_entry)}")
# Optional: Support-Ticket erstellen bei kritischen Violations
if violation_type in ['SQL_INJECTION_ATTEMPT', 'RATE_LIMIT_ABUSE']:
# Hier könnte ein Support-Ticket erstellt werden
pass
Was haben wir gebaut?
Unser Security-Modul ist jetzt ein robuster Schutzschild:
- SQL-Injection Prevention durch Pattern-Matching
- Whitelist-basierte Tabellen/Spalten-Validierung
- WHERE-Klausel Analyse gegen bekannte Tricks
- Input-Sanitization für alle String-Eingaben
- Rate-Limiting gegen Überlastung
- Berechtigungsprüfung auf User-Ebene
- Security-Event Logging
Mit diesem Security-Modul ist Dein Server gut geschützt. Als nächstes bauen wir das Database-Modul, das die eigentlichen Datenbankoperationen durchführt. Ich zeige Dir, wie Du sichere Verbindungen aufbaust, CRUD-Operationen implementierst und mit Transaktionen arbeitest. Weiter zum Database-Modul mit allen CRUD-Operationen