Logging-Modul entwickeln: Das Gedächtnis Deines Servers
Ohne Logging bist Du blind. Wenn nachts um 3 Uhr was schiefgeht, sind gute Logs Gold wert.
Das Logging-Framework
Python's eingebautes logging-Modul ist robust und bewährt. Das nutzen wir:
# modules/logging.py
import logging
import logging.handlers
import json
import os
from datetime import datetime
from typing import Dict, Any, Optional
class LogManager:
"""Zentrales Logging-Management mit strukturierten Logs"""
def __init__(self, config_dir: str):
self.config_dir = config_dir
self.config = self._load_config()
self.loggers = {}
self._setup_logging()
def _load_config(self) -> Dict:
"""Lädt die Logging-Konfiguration"""
config_path = os.path.join(self.config_dir, 'logging.json')
with open(config_path, 'r') as f:
return json.load(f)
def _setup_logging(self):
"""Konfiguriert das Logging-System"""
# Basis-Konfiguration
logging.basicConfig(
level=getattr(logging, self.config['level']),
format=self.config['format']
)
# File Handler wenn aktiviert
if self.config['handlers']['file']['enabled']:
self._setup_file_handler()
File Handler mit Rotation
Logs können groß werden. Rotation verhindert, dass die Platte vollläuft:
def _setup_file_handler(self):
"""Richtet File-Handler mit Rotation ein"""
file_config = self.config['handlers']['file']
log_path = file_config['path']
# Verzeichnis erstellen falls nötig
log_dir = os.path.dirname(log_path)
if not os.path.exists(log_dir):
os.makedirs(log_dir, mode=0o755)
# Rotating File Handler
handler = logging.handlers.RotatingFileHandler(
filename=log_path,
maxBytes=file_config['max_size_mb'] * 1024 * 1024,
backupCount=file_config['backup_count']
)
# Formatter setzen
formatter = logging.Formatter(self.config['format'])
handler.setFormatter(formatter)
# Handler zu Root-Logger hinzufügen
logging.getLogger().addHandler(handler)
Mit 10 MB pro Datei und 5 Backups hast Du maximal 50 MB Logs. Das reicht für Monate.
Spezialisierte Logger
Verschiedene Module bekommen eigene Logger:
def get_logger(self, name: str) -> logging.Logger:
"""Gibt einen benannten Logger zurück"""
if name not in self.loggers:
logger = logging.getLogger(name)
# Spezielles Level setzen wenn konfiguriert
if name in self.config.get('logger_levels', {}):
level = self.config['logger_levels'][name]
logger.setLevel(getattr(logging, level))
self.loggers[name] = logger
return self.loggers[name]
Strukturiertes Logging
JSON-Logs sind maschinenlesbar. Das macht Analyse einfacher:
def log_operation(self,
operation: str,
details: Dict[str, Any],
level: str = 'INFO',
logger_name: str = 'operations'):
"""Loggt eine Operation strukturiert"""
logger = self.get_logger(logger_name)
# Log-Entry erstellen
log_entry = {
'timestamp': datetime.now().isoformat(),
'operation': operation,
'details': details
}
# Als JSON loggen
log_message = json.dumps(log_entry, ensure_ascii=False)
# Level-spezifisch loggen
log_method = getattr(logger, level.lower())
log_method(log_message)
Query-Logging
Besonders wichtig: Alle Datenbankzugriffe protokollieren:
def log_query(self,
query_type: str,
query: str,
params: Optional[list] = None,
execution_time: Optional[float] = None,
affected_rows: Optional[int] = None,
error: Optional[str] = None):
"""Loggt Datenbank-Queries mit Details"""
if not self.config['log_queries']['enabled']:
return
logger = self.get_logger('database')
# Log-Details aufbauen
details = {
'type': query_type,
'query': query[:500] if len(query) > 500 else query # Truncate lange Queries
}
# Optionale Details
if self.config['log_queries']['log_parameters'] and params:
details['parameters'] = params[:10] # Max 10 Parameter loggen
if self.config['log_queries']['log_execution_time'] and execution_time:
details['execution_time_ms'] = round(execution_time * 1000, 2)
if self.config['log_queries']['log_affected_rows'] and affected_rows is not None:
details['affected_rows'] = affected_rows
if error:
details['error'] = error
level = 'ERROR'
else:
level = 'INFO'
self.log_operation(
operation=f"query_{query_type.lower()}",
details=details,
level=level,
logger_name='database'
)
Security-Event Logging
Angriffe und verdächtige Aktivitäten separat loggen:
def log_security_event(self,
event_type: str,
details: Dict[str, Any],
severity: str = 'WARNING'):
"""Loggt Security-relevante Events"""
logger = self.get_logger('security')
# Security-Log erweitern
security_details = {
'event_type': event_type,
'severity': severity,
'timestamp': datetime.now().isoformat(),
**details
}
# Severity-Mapping
severity_map = {
'LOW': 'INFO',
'MEDIUM': 'WARNING',
'HIGH': 'ERROR',
'CRITICAL': 'CRITICAL'
}
level = severity_map.get(severity, 'WARNING')
self.log_operation(
operation='security_event',
details=security_details,
level=level,
logger_name='security'
)
# Bei kritischen Events: Alert
if severity == 'CRITICAL':
self._send_alert(security_details)
Performance-Logging
Langsame Queries und Performance-Probleme erkennen:
def log_performance(self,
operation: str,
duration_ms: float,
threshold_ms: float = 1000):
"""Loggt Performance-Metriken"""
logger = self.get_logger('performance')
details = {
'operation': operation,
'duration_ms': round(duration_ms, 2),
'threshold_ms': threshold_ms
}
# Warnung bei Überschreitung
if duration_ms > threshold_ms:
details['exceeded_threshold'] = True
level = 'WARNING'
message = f"Slow operation: {operation} took {duration_ms}ms"
else:
level = 'DEBUG'
message = f"Operation: {operation} completed in {duration_ms}ms"
log_method = getattr(logger, level.lower())
log_method(message, extra={'details': details})
Log-Analyse Tools
Logs sind nur nützlich, wenn Du sie auch auswertest:
def analyze_logs(self,
log_file: str = None,
start_time: datetime = None,
end_time: datetime = None) -> Dict:
"""Analysiert Logs und gibt Statistiken zurück"""
if not log_file:
log_file = self.config['handlers']['file']['path']
stats = {
'total_entries': 0,
'errors': 0,
'warnings': 0,
'queries': {
'select': 0,
'insert': 0,
'update': 0,
'delete': 0
},
'security_events': 0,
'slow_queries': []
}
with open(log_file, 'r') as f:
for line in f:
try:
# JSON-Logs parsen
if line.startswith('{'):
entry = json.loads(line)
# Zeitfilter
if start_time or end_time:
entry_time = datetime.fromisoformat(
entry.get('timestamp', '')
)
if start_time and entry_time < start_time:
continue
if end_time and entry_time > end_time:
continue
# Statistiken sammeln
stats['total_entries'] += 1
# Query-Statistiken
if 'query_' in entry.get('operation', ''):
query_type = entry['operation'].replace('query_', '')
if query_type in stats['queries']:
stats['queries'][query_type] += 1
# Slow Queries
if 'execution_time_ms' in entry.get('details', {}):
if entry['details']['execution_time_ms'] > 1000:
stats['slow_queries'].append(entry)
# Security Events
if entry.get('operation') == 'security_event':
stats['security_events'] += 1
# Text-Logs parsen
else:
if ' ERROR ' in line:
stats['errors'] += 1
elif ' WARNING ' in line:
stats['warnings'] += 1
stats['total_entries'] += 1
except (json.JSONDecodeError, KeyError):
continue
return stats
Mit dieser Analyse siehst Du auf einen Blick, was in Deinem System passiert.
Alert-System
Bei kritischen Events willst Du sofort informiert werden:
def _send_alert(self, details: Dict[str, Any]):
"""Sendet Alert bei kritischen Events"""
# Support-Ticket erstellen
alert_message = f"""
KRITISCHES SECURITY EVENT
Typ: {details.get('event_type')}
Zeit: {details.get('timestamp')}
Details: {json.dumps(details, indent=2)}
Sofortige Überprüfung erforderlich!
"""
# Hier könnte ein Support-Ticket erstellt werden
# Oder eine E-Mail gesendet werden
# Für jetzt nur kritisches Log
logger = self.get_logger('alerts')
logger.critical(alert_message)
Log-Rotation Script
Für manuelles Archivieren alter Logs:
#!/usr/bin/env python3
# rotate_logs.py
import os
import gzip
import shutil
from datetime import datetime, timedelta
def rotate_logs(log_dir: str, days_to_keep: int = 30):
"""Archiviert alte Logs"""
archive_dir = os.path.join(log_dir, 'archive')
if not os.path.exists(archive_dir):
os.makedirs(archive_dir)
cutoff_date = datetime.now() - timedelta(days=days_to_keep)
for filename in os.listdir(log_dir):
if not filename.endswith('.log'):
continue
filepath = os.path.join(log_dir, filename)
# Datei-Alter prüfen
mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
if mtime < cutoff_date:
# Komprimieren und archivieren
archive_name = f"{filename}.{mtime.strftime('%Y%m%d')}.gz"
archive_path = os.path.join(archive_dir, archive_name)
with open(filepath, 'rb') as f_in:
with gzip.open(archive_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
# Original löschen
os.remove(filepath)
print(f"Archiviert: {filename} -> {archive_name}")
if __name__ == "__main__":
rotate_logs('/var/deinverzeichnis/mcp/content-metadata-server/logs')
Logging Best Practices
Nach einer ganzen Weile in der Praxis haben sich folgende Erkenntnisse herauskristallisiert:
- Log früh, log oft: Lieber zu viel als zu wenig
- Strukturierte Logs: JSON macht Analyse einfach
- Keine sensiblen Daten: Passwörter gehören NIE in Logs
- Performance beachten: Logging darf nicht bremsen
- Rotation einrichten: Volle Platten sind kein Spaß
- Separate Logger: Security, Performance, Operations trennen
- Alerts bei Kritischem: Sofort reagieren können
Das komplette Logging-Modul
Zusammengefasst haben wir jetzt:
# modules/logging.py - Übersicht
class LogManager:
def __init__(self, config_dir: str)
def get_logger(self, name: str) -> logging.Logger
def log_operation(self, operation: str, details: Dict)
def log_query(self, query_type: str, query: str, ...)
def log_security_event(self, event_type: str, details: Dict)
def log_performance(self, operation: str, duration_ms: float)
def analyze_logs(self, log_file: str) -> Dict
def _send_alert(self, details: Dict)
Was haben wir erreicht?
Unser Logging-System kann jetzt:
- Strukturierte Logs im JSON-Format schreiben
- Automatische Log-Rotation bei Größenüberschreitung
- Separate Logger für verschiedene Module
- Query-Logging mit Performance-Metriken
- Security-Event Tracking
- Performance-Monitoring
- Log-Analyse und Statistiken
- Alert-System für kritische Events
Mit diesem Logging-System hast Du immer den vollen Überblick, was in Deinem MCP Server passiert. Im nächsten Kapitel fügen wir alles zusammen und bauen den eigentlichen MCP Server. Du lernst, wie Du mit FastMCP alle Tools definierst, die Module integrierst und Claude Code anbindest. Das wird der spannende Teil, wo aus den einzelnen Bausteinen ein funktionierender Server entsteht. Weiter zur Server-Implementation mit FastMCP