Security-Modul entwickeln: Der Schutzschild

Sicherheit ist kein Feature, sondern eine Grundvoraussetzung. In diesem Kapitel bauen wir das Security-Modul, das alle Eingaben validiert und böse Anfragen blockiert.

Das Security-Modul erstellen

Wir beginnen mit der Grundstruktur unseres Security-Validators:

# modules/security.py
import re
import json
import os
from typing import List, Tuple, Dict, Any, Optional

class SecurityValidator:
    """Zentrale Sicherheitsvalidierung für alle DB-Operationen"""

    def __init__(self, config_dir: str):
        self.config_dir = config_dir
        self.security_config = self._load_security_config()
        self.blocked_patterns = self._compile_blocked_patterns()

    def _load_security_config(self) -> Dict:
        """Lädt die Sicherheitskonfiguration"""
        config_path = os.path.join(self.config_dir, 'security.json')
        with open(config_path, 'r') as f:
            return json.load(f)

    def _compile_blocked_patterns(self) -> List:
        """Kompiliert gefährliche SQL-Muster für schnelle Prüfung"""
        patterns = []
        for keyword in self.security_config['blocked_keywords']:
            # Case-insensitive Muster mit Word-Boundaries
            pattern = re.compile(
                r'\b' + re.escape(keyword) + r'\b',
                re.IGNORECASE
            )
            patterns.append(pattern)
        return patterns

Query-Validierung

Der wichtigste Teil: Prüfen, ob eine Query sicher ist:

    def validate_query(self, query: str, params: Optional[List] = None) -> Tuple[bool, str]:
        """Validiert eine SQL-Query auf Sicherheitsprobleme"""

        # Leere Query ablehnen
        if not query or not query.strip():
            return False, "Query darf nicht leer sein"

        # Zu lange Queries ablehnen
        if len(query) > 10000:
            return False, "Query zu lang (max 10000 Zeichen)"

        # Geblockte Keywords prüfen
        for pattern in self.blocked_patterns:
            if pattern.search(query):
                return False, f"Verbotenes SQL-Keyword gefunden"

        # Multiple Statements verhindern
        if ';' in query.strip()[:-1]:  # ; am Ende ist OK
            return False, "Multiple Statements nicht erlaubt"

        # Parameter-Count prüfen
        if params:
            max_params = self.security_config['parameter_binding']['max_parameters']
            if len(params) > max_params:
                return False, f"Zu viele Parameter (max {max_params})"

        # Kommentare verhindern (können SQL-Injection verstecken)
        if '--' in query or '/*' in query:
            return False, "SQL-Kommentare nicht erlaubt"

        return True, "Query ist sicher"

Die Regex-Patterns mit Word-Boundaries (\b) verhindern, dass normale Wörter wie "dropdown" fälschlicherweise als "DROP" erkannt werden.

Tabellen- und Spalten-Validierung

Wir prüfen, ob nur erlaubte Tabellen und Spalten verwendet werden:

    def validate_table_name(self, table: str) -> bool:
        """Prüft ob Tabellenname erlaubt ist"""
        allowed_tables = self.security_config.get('allowed_tables', [])

        # Nur alphanumerisch und Underscore
        if not re.match(r'^[a-zA-Z0-9_]+$', table):
            return False

        # Muss in Whitelist sein
        return table in allowed_tables

    def validate_column_names(self, columns: List[str]) -> bool:
        """Prüft ob alle Spaltennamen erlaubt sind"""
        for column in columns:
            # Nur alphanumerisch und Underscore
            if not re.match(r'^[a-zA-Z0-9_]+$', column):
                return False

            # Prüfen ob in erlaubten Spalten
            for table, allowed_cols in self.security_config.get('allowed_columns', {}).items():
                if column not in allowed_cols:
                    return False

        return True

WHERE-Klausel Validierung

WHERE-Klauseln sind besonders anfällig für Injection. Hier unsere Prüfung:

    def validate_where_clause(self, where_clause: str) -> Tuple[bool, str]:
        """Validiert WHERE-Klauseln auf gefährliche Muster"""

        if not where_clause:
            return False, "WHERE-Klausel darf nicht leer sein"

        # Gefährliche Funktionen blockieren
        dangerous_functions = [
            'SLEEP', 'BENCHMARK', 'LOAD_FILE',
            'INTO OUTFILE', 'INTO DUMPFILE'
        ]

        where_upper = where_clause.upper()
        for func in dangerous_functions:
            if func in where_upper:
                return False, f"Gefährliche Funktion '{func}' nicht erlaubt"

        # Verschachtelte SELECTs verhindern (Subqueries)
        if 'SELECT' in where_upper:
            return False, "Subqueries in WHERE nicht erlaubt"

        # OR 1=1 und ähnliche Tricks erkennen
        suspicious_patterns = [
            r'OR\s+1\s*=\s*1',
            r'OR\s+["\']1["\']\s*=\s*["\']1["\']',
            r'OR\s+true',
            r'AND\s+1\s*=\s*0',
            r'AND\s+false'
        ]

        for pattern in suspicious_patterns:
            if re.search(pattern, where_clause, re.IGNORECASE):
                return False, "Verdächtiges WHERE-Muster erkannt"

        return True, "WHERE-Klausel ist sicher"

Input-Sanitization

Für Strings und andere Eingaben brauchen wir Sanitization:

    def sanitize_string(self, value: str, max_length: int = 1000) -> str:
        """Bereinigt String-Eingaben"""
        if not value:
            return ""

        # Länge begrenzen
        value = value[:max_length]

        # Control-Characters entfernen (außer Newline und Tab)
        value = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', value)

        # Führende/nachfolgende Whitespaces entfernen
        value = value.strip()

        return value

    def sanitize_identifier(self, identifier: str) -> Optional[str]:
        """Bereinigt Datenbank-Identifier (Tabellen, Spalten)"""
        if not identifier:
            return None

        # Nur alphanumerisch und Underscore
        if not re.match(r'^[a-zA-Z][a-zA-Z0-9_]*$', identifier):
            return None

        # Maximale Länge (MySQL: 64 Zeichen)
        if len(identifier) > 64:
            return None

        return identifier

Rate-Limiting Implementierung

Zu viele Anfragen zu schnell? Das blockieren wir:

    def __init__(self, config_dir: str):
        # ... vorheriger Code ...
        self.query_timestamps = {}  # User -> Liste von Timestamps
        self.rate_limits = self.security_config.get('rate_limiting', {})

    def check_rate_limit(self, user: str) -> Tuple[bool, str]:
        """Prüft ob User sein Rate-Limit überschritten hat"""
        import time

        current_time = time.time()
        queries_per_minute = self.rate_limits.get('queries_per_minute', 60)

        # User-Timestamps holen oder initialisieren
        if user not in self.query_timestamps:
            self.query_timestamps[user] = []

        # Alte Timestamps entfernen (älter als 1 Minute)
        self.query_timestamps[user] = [
            ts for ts in self.query_timestamps[user]
            if current_time - ts < 60
        ]

        # Prüfen ob Limit erreicht
        if len(self.query_timestamps[user]) >= queries_per_minute:
            return False, f"Rate-Limit erreicht ({queries_per_minute} Queries/Minute)"

        # Neuen Timestamp hinzufügen
        self.query_timestamps[user].append(current_time)

        return True, "Rate-Limit OK"

Rate-Limiting verhindert nicht nur DoS-Angriffe, sondern auch versehentliche Endlosschleifen in Client-Code.

Berechtigungs-Prüfung

Nicht jeder darf alles. Hier prüfen wir die Berechtigungen:

    def check_operation_permission(self, user_type: str, operation: str, database: str) -> bool:
        """Prüft ob User-Typ die Operation ausführen darf"""

        # User-Permissions aus Config laden
        with open(os.path.join(self.config_dir, 'credentials.json'), 'r') as f:
            cred_config = json.load(f)

        # Default User-Type wenn nicht angegeben
        if not user_type:
            user_type = cred_config.get('default_user', 'content_reader')

        # User-Config holen
        user_config = cred_config.get('database_users', {}).get(user_type)
        if not user_config:
            return False

        # Datenbank prüfen
        allowed_dbs = user_config.get('databases', [])
        if database not in allowed_dbs:
            return False

        # Operation prüfen
        allowed_ops = user_config.get('permissions', [])
        operation_map = {
            'SELECT': 'SELECT',
            'LIST_TABLES': 'SELECT',
            'DESCRIBE_TABLE': 'SELECT',
            'INSERT': 'INSERT',
            'UPDATE': 'UPDATE',
            'DELETE': 'DELETE'
        }

        required_permission = operation_map.get(operation, operation)
        return required_permission in allowed_ops

Das komplette Security-Modul

Hier nochmal die wichtigsten Methoden zusammengefasst:

# modules/security.py - Zusammenfassung
class SecurityValidator:
    def __init__(self, config_dir: str):
        """Initialisiert Security-Validator mit Config"""

    def validate_query(self, query: str, params: Optional[List]) -> Tuple[bool, str]:
        """Hauptvalidierung für SQL-Queries"""

    def validate_table_name(self, table: str) -> bool:
        """Prüft Tabellennamen"""

    def validate_column_names(self, columns: List[str]) -> bool:
        """Prüft Spaltennamen"""

    def validate_where_clause(self, where: str) -> Tuple[bool, str]:
        """Prüft WHERE-Klauseln"""

    def sanitize_string(self, value: str, max_length: int) -> str:
        """Bereinigt String-Eingaben"""

    def check_rate_limit(self, user: str) -> Tuple[bool, str]:
        """Rate-Limiting pro User"""

    def check_operation_permission(self, user: str, op: str, db: str) -> bool:
        """Prüft Berechtigungen"""

Security-Tests schreiben

Vertrauen ist gut, Tests sind besser:

# test_security.py
#!/usr/bin/env python3
import sys
sys.path.append('.')
from modules.security import SecurityValidator

def test_sql_injection():
    """Testet SQL-Injection Erkennung"""
    validator = SecurityValidator('./config')

    # Diese sollten geblockt werden
    bad_queries = [
        "SELECT * FROM users; DROP TABLE users",
        "SELECT * FROM users WHERE id = 1 OR 1=1",
        "SELECT * FROM users WHERE name = '' OR '1'='1'",
        "SELECT * FROM users -- comment",
        "SELECT * FROM users WHERE id = 1 UNION SELECT * FROM passwords"
    ]

    for query in bad_queries:
        valid, msg = validator.validate_query(query)
        assert not valid, f"Sollte geblockt werden: {query}"
        print(f"BLOCKED: {query[:50]}... - {msg}")

    # Diese sollten durchkommen
    good_queries = [
        "SELECT * FROM content_metadata WHERE id = ?",
        "UPDATE content_metadata SET title = ? WHERE id = ?",
        "DELETE FROM content_metadata WHERE status = ?"
    ]

    for query in good_queries:
        valid, msg = validator.validate_query(query, [1, 2])
        assert valid, f"Sollte erlaubt sein: {query}"
        print(f"ALLOWED: {query[:50]}...")

    print("\nAlle Security-Tests bestanden!")

if __name__ == "__main__":
    test_sql_injection()

Führe diese Tests regelmäßig aus. Besonders nach Änderungen am Security-Modul.

Logging von Security-Events

Jeder geblockte Angriff sollte geloggt werden:

    def log_security_violation(self, violation_type: str, details: Dict[str, Any]):
        """Loggt Sicherheitsverletzungen für spätere Analyse"""
        import logging
        import json
        from datetime import datetime

        logger = logging.getLogger('security')

        log_entry = {
            'timestamp': datetime.now().isoformat(),
            'type': violation_type,
            'details': details
        }

        # In Security-Log schreiben
        logger.warning(f"SECURITY VIOLATION: {json.dumps(log_entry)}")

        # Optional: Support-Ticket erstellen bei kritischen Violations
        if violation_type in ['SQL_INJECTION_ATTEMPT', 'RATE_LIMIT_ABUSE']:
            # Hier könnte ein Support-Ticket erstellt werden
            pass

Was haben wir gebaut?

Unser Security-Modul ist jetzt ein robuster Schutzschild:

Mit diesem Security-Modul ist Dein Server gut geschützt. Als nächstes bauen wir das Database-Modul, das die eigentlichen Datenbankoperationen durchführt. Ich zeige Dir, wie Du sichere Verbindungen aufbaust, CRUD-Operationen implementierst und mit Transaktionen arbeitest. Weiter zum Database-Modul mit allen CRUD-Operationen