Logging-Modul entwickeln: Das Gedächtnis Deines Servers

Ohne Logging bist Du blind. Wenn nachts um 3 Uhr was schiefgeht, sind gute Logs Gold wert.

Das Logging-Framework

Python's eingebautes logging-Modul ist robust und bewährt. Das nutzen wir:

# modules/logging.py
import logging
import logging.handlers
import json
import os
from datetime import datetime
from typing import Dict, Any, Optional

class LogManager:
    """Zentrales Logging-Management mit strukturierten Logs"""

    def __init__(self, config_dir: str):
        self.config_dir = config_dir
        self.config = self._load_config()
        self.loggers = {}
        self._setup_logging()

    def _load_config(self) -> Dict:
        """Lädt die Logging-Konfiguration"""
        config_path = os.path.join(self.config_dir, 'logging.json')
        with open(config_path, 'r') as f:
            return json.load(f)

    def _setup_logging(self):
        """Konfiguriert das Logging-System"""

        # Basis-Konfiguration
        logging.basicConfig(
            level=getattr(logging, self.config['level']),
            format=self.config['format']
        )

        # File Handler wenn aktiviert
        if self.config['handlers']['file']['enabled']:
            self._setup_file_handler()

File Handler mit Rotation

Logs können groß werden. Rotation verhindert, dass die Platte vollläuft:

    def _setup_file_handler(self):
        """Richtet File-Handler mit Rotation ein"""

        file_config = self.config['handlers']['file']
        log_path = file_config['path']

        # Verzeichnis erstellen falls nötig
        log_dir = os.path.dirname(log_path)
        if not os.path.exists(log_dir):
            os.makedirs(log_dir, mode=0o755)

        # Rotating File Handler
        handler = logging.handlers.RotatingFileHandler(
            filename=log_path,
            maxBytes=file_config['max_size_mb'] * 1024 * 1024,
            backupCount=file_config['backup_count']
        )

        # Formatter setzen
        formatter = logging.Formatter(self.config['format'])
        handler.setFormatter(formatter)

        # Handler zu Root-Logger hinzufügen
        logging.getLogger().addHandler(handler)

Mit 10 MB pro Datei und 5 Backups hast Du maximal 50 MB Logs. Das reicht für Monate.

Spezialisierte Logger

Verschiedene Module bekommen eigene Logger:

    def get_logger(self, name: str) -> logging.Logger:
        """Gibt einen benannten Logger zurück"""

        if name not in self.loggers:
            logger = logging.getLogger(name)

            # Spezielles Level setzen wenn konfiguriert
            if name in self.config.get('logger_levels', {}):
                level = self.config['logger_levels'][name]
                logger.setLevel(getattr(logging, level))

            self.loggers[name] = logger

        return self.loggers[name]

Strukturiertes Logging

JSON-Logs sind maschinenlesbar. Das macht Analyse einfacher:

    def log_operation(self,
                      operation: str,
                      details: Dict[str, Any],
                      level: str = 'INFO',
                      logger_name: str = 'operations'):
        """Loggt eine Operation strukturiert"""

        logger = self.get_logger(logger_name)

        # Log-Entry erstellen
        log_entry = {
            'timestamp': datetime.now().isoformat(),
            'operation': operation,
            'details': details
        }

        # Als JSON loggen
        log_message = json.dumps(log_entry, ensure_ascii=False)

        # Level-spezifisch loggen
        log_method = getattr(logger, level.lower())
        log_method(log_message)

Query-Logging

Besonders wichtig: Alle Datenbankzugriffe protokollieren:

    def log_query(self,
                  query_type: str,
                  query: str,
                  params: Optional[list] = None,
                  execution_time: Optional[float] = None,
                  affected_rows: Optional[int] = None,
                  error: Optional[str] = None):
        """Loggt Datenbank-Queries mit Details"""

        if not self.config['log_queries']['enabled']:
            return

        logger = self.get_logger('database')

        # Log-Details aufbauen
        details = {
            'type': query_type,
            'query': query[:500] if len(query) > 500 else query  # Truncate lange Queries
        }

        # Optionale Details
        if self.config['log_queries']['log_parameters'] and params:
            details['parameters'] = params[:10]  # Max 10 Parameter loggen

        if self.config['log_queries']['log_execution_time'] and execution_time:
            details['execution_time_ms'] = round(execution_time * 1000, 2)

        if self.config['log_queries']['log_affected_rows'] and affected_rows is not None:
            details['affected_rows'] = affected_rows

        if error:
            details['error'] = error
            level = 'ERROR'
        else:
            level = 'INFO'

        self.log_operation(
            operation=f"query_{query_type.lower()}",
            details=details,
            level=level,
            logger_name='database'
        )

Security-Event Logging

Angriffe und verdächtige Aktivitäten separat loggen:

    def log_security_event(self,
                           event_type: str,
                           details: Dict[str, Any],
                           severity: str = 'WARNING'):
        """Loggt Security-relevante Events"""

        logger = self.get_logger('security')

        # Security-Log erweitern
        security_details = {
            'event_type': event_type,
            'severity': severity,
            'timestamp': datetime.now().isoformat(),
            **details
        }

        # Severity-Mapping
        severity_map = {
            'LOW': 'INFO',
            'MEDIUM': 'WARNING',
            'HIGH': 'ERROR',
            'CRITICAL': 'CRITICAL'
        }

        level = severity_map.get(severity, 'WARNING')

        self.log_operation(
            operation='security_event',
            details=security_details,
            level=level,
            logger_name='security'
        )

        # Bei kritischen Events: Alert
        if severity == 'CRITICAL':
            self._send_alert(security_details)

Performance-Logging

Langsame Queries und Performance-Probleme erkennen:

    def log_performance(self,
                        operation: str,
                        duration_ms: float,
                        threshold_ms: float = 1000):
        """Loggt Performance-Metriken"""

        logger = self.get_logger('performance')

        details = {
            'operation': operation,
            'duration_ms': round(duration_ms, 2),
            'threshold_ms': threshold_ms
        }

        # Warnung bei Überschreitung
        if duration_ms > threshold_ms:
            details['exceeded_threshold'] = True
            level = 'WARNING'
            message = f"Slow operation: {operation} took {duration_ms}ms"
        else:
            level = 'DEBUG'
            message = f"Operation: {operation} completed in {duration_ms}ms"

        log_method = getattr(logger, level.lower())
        log_method(message, extra={'details': details})

Log-Analyse Tools

Logs sind nur nützlich, wenn Du sie auch auswertest:

    def analyze_logs(self,
                     log_file: str = None,
                     start_time: datetime = None,
                     end_time: datetime = None) -> Dict:
        """Analysiert Logs und gibt Statistiken zurück"""

        if not log_file:
            log_file = self.config['handlers']['file']['path']

        stats = {
            'total_entries': 0,
            'errors': 0,
            'warnings': 0,
            'queries': {
                'select': 0,
                'insert': 0,
                'update': 0,
                'delete': 0
            },
            'security_events': 0,
            'slow_queries': []
        }

        with open(log_file, 'r') as f:
            for line in f:
                try:
                    # JSON-Logs parsen
                    if line.startswith('{'):
                        entry = json.loads(line)

                        # Zeitfilter
                        if start_time or end_time:
                            entry_time = datetime.fromisoformat(
                                entry.get('timestamp', '')
                            )
                            if start_time and entry_time < start_time:
                                continue
                            if end_time and entry_time > end_time:
                                continue

                        # Statistiken sammeln
                        stats['total_entries'] += 1

                        # Query-Statistiken
                        if 'query_' in entry.get('operation', ''):
                            query_type = entry['operation'].replace('query_', '')
                            if query_type in stats['queries']:
                                stats['queries'][query_type] += 1

                        # Slow Queries
                        if 'execution_time_ms' in entry.get('details', {}):
                            if entry['details']['execution_time_ms'] > 1000:
                                stats['slow_queries'].append(entry)

                        # Security Events
                        if entry.get('operation') == 'security_event':
                            stats['security_events'] += 1

                    # Text-Logs parsen
                    else:
                        if ' ERROR ' in line:
                            stats['errors'] += 1
                        elif ' WARNING ' in line:
                            stats['warnings'] += 1
                        stats['total_entries'] += 1

                except (json.JSONDecodeError, KeyError):
                    continue

        return stats

Mit dieser Analyse siehst Du auf einen Blick, was in Deinem System passiert.

Alert-System

Bei kritischen Events willst Du sofort informiert werden:

    def _send_alert(self, details: Dict[str, Any]):
        """Sendet Alert bei kritischen Events"""

        # Support-Ticket erstellen
        alert_message = f"""
        KRITISCHES SECURITY EVENT

        Typ: {details.get('event_type')}
        Zeit: {details.get('timestamp')}
        Details: {json.dumps(details, indent=2)}

        Sofortige Überprüfung erforderlich!
        """

        # Hier könnte ein Support-Ticket erstellt werden
        # Oder eine E-Mail gesendet werden
        # Für jetzt nur kritisches Log

        logger = self.get_logger('alerts')
        logger.critical(alert_message)

Log-Rotation Script

Für manuelles Archivieren alter Logs:

#!/usr/bin/env python3
# rotate_logs.py

import os
import gzip
import shutil
from datetime import datetime, timedelta

def rotate_logs(log_dir: str, days_to_keep: int = 30):
    """Archiviert alte Logs"""

    archive_dir = os.path.join(log_dir, 'archive')
    if not os.path.exists(archive_dir):
        os.makedirs(archive_dir)

    cutoff_date = datetime.now() - timedelta(days=days_to_keep)

    for filename in os.listdir(log_dir):
        if not filename.endswith('.log'):
            continue

        filepath = os.path.join(log_dir, filename)

        # Datei-Alter prüfen
        mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
        if mtime < cutoff_date:
            # Komprimieren und archivieren
            archive_name = f"{filename}.{mtime.strftime('%Y%m%d')}.gz"
            archive_path = os.path.join(archive_dir, archive_name)

            with open(filepath, 'rb') as f_in:
                with gzip.open(archive_path, 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)

            # Original löschen
            os.remove(filepath)
            print(f"Archiviert: {filename} -> {archive_name}")

if __name__ == "__main__":
    rotate_logs('/var/deinverzeichnis/mcp/content-metadata-server/logs')

Logging Best Practices

Nach einer ganzen Weile in der Praxis haben sich folgende Erkenntnisse herauskristallisiert:

Das komplette Logging-Modul

Zusammengefasst haben wir jetzt:

# modules/logging.py - Übersicht
class LogManager:
    def __init__(self, config_dir: str)
    def get_logger(self, name: str) -> logging.Logger
    def log_operation(self, operation: str, details: Dict)
    def log_query(self, query_type: str, query: str, ...)
    def log_security_event(self, event_type: str, details: Dict)
    def log_performance(self, operation: str, duration_ms: float)
    def analyze_logs(self, log_file: str) -> Dict
    def _send_alert(self, details: Dict)

Was haben wir erreicht?

Unser Logging-System kann jetzt:

Mit diesem Logging-System hast Du immer den vollen Überblick, was in Deinem MCP Server passiert. Im nächsten Kapitel fügen wir alles zusammen und bauen den eigentlichen MCP Server. Du lernst, wie Du mit FastMCP alle Tools definierst, die Module integrierst und Claude Code anbindest. Das wird der spannende Teil, wo aus den einzelnen Bausteinen ein funktionierender Server entsteht. Weiter zur Server-Implementation mit FastMCP