MCP Server implementieren: Alles zusammenfügen
Jetzt kommt der spannende Teil: Wir fügen alle Module zusammen und bauen den eigentlichen MCP Server mit FastMCP.
Der Hauptserver
Hier ist das Herzstück - die server.py:
#!/usr/bin/env python3
# server.py - MCP Content Metadata Server
import os
import sys
import logging
from typing import Dict, Any, Optional, List
from fastmcp import FastMCP
# Module importieren
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from modules.database import DatabaseManager
from modules.security import SecurityValidator
from modules.logging import LogManager
# Server initialisieren
mcp = FastMCP("Content Metadata Server")
# Globale Manager
config_dir = os.environ.get('DATENBANK_CONFIG_DIR', './config')
log_manager = LogManager(config_dir)
security = SecurityValidator(config_dir)
db_manager = None # Wird bei Bedarf initialisiert
# Logger setup
logger = log_manager.get_logger('server')
logger.info("MCP Server startet...")
def get_db_connection():
"""Lazy-Loading für Datenbankverbindung"""
global db_manager
if db_manager is None:
db_manager = DatabaseManager(config_dir)
db_manager.connect()
return db_manager
Tool-Definitionen
Jede Operation wird als Tool registriert:
@mcp.tool()
async def list_tables() -> Dict[str, Any]:
"""
Listet alle verfügbaren Tabellen der Datenbank auf.
Nützlich um zu sehen, welche Tabellen verfügbar sind.
"""
try:
db = get_db_connection()
success, result = db.list_tables()
if success:
return {
"success": True,
"tables": result,
"count": len(result)
}
else:
return {
"success": False,
"error": result
}
except Exception as e:
logger.error(f"list_tables error: {e}")
return {
"success": False,
"error": str(e)
}
@mcp.tool()
async def describe_table(table: str = None) -> Dict[str, Any]:
"""
Zeigt die Struktur einer Tabelle (Spalten, Typen, etc).
Args:
table: Tabellenname (optional, default: content_metadata)
"""
try:
db = get_db_connection()
# Security Check
if table and not security.validate_table_name(table):
return {
"success": False,
"error": "Ungültiger Tabellenname"
}
success, result = db.describe_table(table)
if success:
return {
"success": True,
"table": table or "content_metadata",
"structure": result
}
else:
return {
"success": False,
"error": result
}
except Exception as e:
logger.error(f"describe_table error: {e}")
return {
"success": False,
"error": str(e)
}
SELECT-Tool
Das wichtigste Tool - Daten abfragen:
@mcp.tool()
async def select_data(
columns: Optional[List[str]] = None,
where: Optional[str] = None,
params: Optional[List] = None,
order_by: Optional[str] = None,
limit: Optional[int] = None
) -> Dict[str, Any]:
"""
Führt eine SELECT-Query auf content_metadata aus.
Args:
columns: Liste der Spalten die zurückgegeben werden sollen
where: WHERE-Klausel (ohne WHERE keyword)
params: Parameter für prepared statements
order_by: ORDER BY Klausel (z.B. "created_at DESC")
limit: Maximale Anzahl Ergebnisse (max 100)
Example:
select_data(
columns=["id", "title", "status"],
where="status = ?",
params=["published"],
limit=10
)
"""
try:
db = get_db_connection()
# Rate Limiting
user = "default" # Könnte aus Context kommen
rate_ok, rate_msg = security.check_rate_limit(user)
if not rate_ok:
return {
"success": False,
"error": rate_msg
}
# Query ausführen
success, result = db.select(
columns=columns,
where=where,
params=params,
order_by=order_by,
limit=limit
)
# Logging
log_manager.log_query(
query_type="SELECT",
query=f"SELECT {columns or '*'} FROM content_metadata",
params=params,
affected_rows=len(result) if success else None,
error=result if not success else None
)
if success:
return {
"success": True,
"data": result,
"count": len(result)
}
else:
return {
"success": False,
"error": result
}
except Exception as e:
logger.error(f"select_data error: {e}")
return {
"success": False,
"error": str(e)
}
INSERT-Tool
Neue Datensätze hinzufügen:
@mcp.tool()
async def insert_data(data: Dict[str, Any]) -> Dict[str, Any]:
"""
Fügt einen neuen Datensatz in content_metadata ein.
Args:
data: Dictionary mit Spalten und Werten
Example:
insert_data({
"title": "Neue Seite",
"meta_description": "Beschreibung",
"status": "draft"
})
"""
try:
db = get_db_connection()
# Pflichtfelder prüfen
if not data.get('title'):
return {
"success": False,
"error": "Title ist ein Pflichtfeld"
}
# Defaults setzen
if 'status' not in data:
data['status'] = 'draft'
if 'template' not in data:
data['template'] = '100-template'
# User-Type für Berechtigung
user_type = 'content_writer' # Könnte aus Context kommen
# Berechtigung prüfen
if not security.check_operation_permission(
user_type, 'INSERT', 'karlkratz_de'
):
return {
"success": False,
"error": "Keine Berechtigung für INSERT"
}
# Datenbank-Operation
db.connect(user_type)
success, result = db.insert(data)
# Logging
log_manager.log_query(
query_type="INSERT",
query="INSERT INTO content_metadata",
affected_rows=result.get('affected_rows') if success else None,
error=result if not success else None
)
if success:
return {
"success": True,
"insert_id": result['insert_id'],
"message": f"Datensatz mit ID {result['insert_id']} eingefügt"
}
else:
return {
"success": False,
"error": result
}
except Exception as e:
logger.error(f"insert_data error: {e}")
db.rollback_transaction()
return {
"success": False,
"error": str(e)
}
UPDATE-Tool
Bestehende Daten ändern:
@mcp.tool()
async def update_data(
data: Dict[str, Any],
where: str,
params: Optional[List] = None
) -> Dict[str, Any]:
"""
Aktualisiert Datensätze in content_metadata.
Args:
data: Dictionary mit zu aktualisierenden Spalten
where: WHERE-Klausel (ohne WHERE keyword)
params: Parameter für WHERE-Klausel
Example:
update_data(
data={"status": "published"},
where="id = ?",
params=[42]
)
"""
try:
db = get_db_connection()
# Berechtigung prüfen
user_type = 'content_writer'
if not security.check_operation_permission(
user_type, 'UPDATE', 'karlkratz_de'
):
return {
"success": False,
"error": "Keine Berechtigung für UPDATE"
}
# Timestamp aktualisieren
data['updated_at'] = 'CURRENT_TIMESTAMP'
# Datenbank-Operation
db.connect(user_type)
success, result = db.update(data, where, params)
# Logging
log_manager.log_query(
query_type="UPDATE",
query="UPDATE content_metadata",
params=params,
affected_rows=result.get('affected_rows') if success else None,
error=result if not success else None
)
if success:
return {
"success": True,
"affected_rows": result['affected_rows'],
"message": f"{result['affected_rows']} Datensätze aktualisiert"
}
else:
return {
"success": False,
"error": result
}
except Exception as e:
logger.error(f"update_data error: {e}")
db.rollback_transaction()
return {
"success": False,
"error": str(e)
}
DELETE-Tool
Datensätze löschen (mit Vorsicht!):
@mcp.tool()
async def delete_data(
where: str,
params: Optional[List] = None
) -> Dict[str, Any]:
"""
Löscht Datensätze aus content_metadata.
VORSICHT: Diese Operation kann nicht rückgängig gemacht werden!
Args:
where: WHERE-Klausel (ohne WHERE keyword)
params: Parameter für WHERE-Klausel
Example:
delete_data(
where="status = ? AND created_at < ?",
params=["draft", "2024-01-01"]
)
"""
try:
db = get_db_connection()
# Extra Sicherheitsabfrage
if not where or where.strip().lower() in ['1=1', 'true']:
return {
"success": False,
"error": "Unsichere WHERE-Klausel"
}
# Berechtigung prüfen
user_type = 'content_writer'
if not security.check_operation_permission(
user_type, 'DELETE', 'karlkratz_de'
):
return {
"success": False,
"error": "Keine Berechtigung für DELETE"
}
# Datenbank-Operation
db.connect(user_type)
success, result = db.delete(where, params)
# Logging (besonders wichtig bei DELETE!)
log_manager.log_query(
query_type="DELETE",
query="DELETE FROM content_metadata",
params=params,
affected_rows=result.get('affected_rows') if success else None,
error=result if not success else None
)
if success:
# Security Event bei vielen gelöschten Zeilen
if result['affected_rows'] > 5:
log_manager.log_security_event(
event_type='MASS_DELETE',
details={
'affected_rows': result['affected_rows'],
'where': where
},
severity='HIGH'
)
return {
"success": True,
"affected_rows": result['affected_rows'],
"message": f"{result['affected_rows']} Datensätze gelöscht"
}
else:
return {
"success": False,
"error": result
}
except Exception as e:
logger.error(f"delete_data error: {e}")
db.rollback_transaction()
return {
"success": False,
"error": str(e)
}
Utility-Tools
Zusätzliche nützliche Tools:
@mcp.tool()
async def count_records(
where: Optional[str] = None,
params: Optional[List] = None
) -> Dict[str, Any]:
"""
Zählt Datensätze in content_metadata.
Args:
where: Optionale WHERE-Klausel
params: Parameter für WHERE-Klausel
"""
try:
db = get_db_connection()
success, count = db.count_records(where, params)
if success:
return {
"success": True,
"count": count
}
else:
return {
"success": False,
"error": count
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
@mcp.tool()
async def get_log_stats(
start_time: Optional[str] = None,
end_time: Optional[str] = None
) -> Dict[str, Any]:
"""
Gibt Statistiken über die Server-Nutzung zurück.
Args:
start_time: Start-Zeitpunkt (ISO format)
end_time: End-Zeitpunkt (ISO format)
"""
try:
from datetime import datetime
start = datetime.fromisoformat(start_time) if start_time else None
end = datetime.fromisoformat(end_time) if end_time else None
stats = log_manager.analyze_logs(
start_time=start,
end_time=end
)
return {
"success": True,
"stats": stats
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
Server starten
Der finale Teil - Server hochfahren:
if __name__ == "__main__":
import asyncio
# Server Info ausgeben
print(f"""
╔════════════════════════════════════════╗
║ Content Metadata MCP Server v1.0 ║
╠════════════════════════════════════════╣
║ Config Dir: {config_dir:<25} ║
║ Target Table: content_metadata ║
║ Security: Enabled ║
║ Logging: Enabled ║
╚════════════════════════════════════════╝
""")
# Server starten
try:
logger.info("Server bereit für Verbindungen")
asyncio.run(mcp.run())
except KeyboardInterrupt:
logger.info("Server wird heruntergefahren...")
if db_manager:
db_manager.disconnect()
print("\nServer beendet.")
except Exception as e:
logger.error(f"Server-Fehler: {e}")
raise
Claude Code Integration
Damit Claude den Server findet, brauchst Du diese Konfiguration:
# ~/.claude/mcp.json
{
"mcpServers": {
"content-metadata": {
"command": "/var/deinverzeichnis/mcp/content-metadata-server/venv/bin/python",
"args": ["/var/deinverzeichnis/mcp/content-metadata-server/server.py"],
"env": {
"DATENBANK_CONFIG_DIR": "/var/deinverzeichnis/mcp/content-metadata-server/config"
}
}
}
}
Server testen
Teste ob alles funktioniert:
# test_server.py
#!/usr/bin/env python3
import asyncio
import sys
sys.path.append('.')
# Server-Module importieren
from server import (
list_tables,
describe_table,
select_data,
count_records
)
async def test_server():
"""Testet die Server-Funktionen"""
print("=== MCP Server Test ===\n")
# Test 1: Tabellen auflisten
print("1. Tabellen auflisten:")
result = await list_tables()
if result['success']:
print(f" Gefunden: {result['count']} Tabellen")
else:
print(f" FEHLER: {result['error']}")
# Test 2: Tabellenstruktur
print("\n2. Tabellenstruktur:")
result = await describe_table()
if result['success']:
print(f" Spalten: {len(result['structure'])}")
else:
print(f" FEHLER: {result['error']}")
# Test 3: Daten abfragen
print("\n3. Daten abfragen:")
result = await select_data(
columns=["id", "title", "status"],
where="status = ?",
params=["published"],
limit=5
)
if result['success']:
print(f" Gefunden: {result['count']} Datensätze")
for row in result['data']:
print(f" - [{row['id']}] {row['title'][:50]}...")
else:
print(f" FEHLER: {result['error']}")
# Test 4: Anzahl zählen
print("\n4. Datensätze zählen:")
result = await count_records()
if result['success']:
print(f" Gesamt: {result['count']} Datensätze")
else:
print(f" FEHLER: {result['error']}")
print("\n=== Test abgeschlossen ===")
if __name__ == "__main__":
asyncio.run(test_server())
Führe den Test aus mit: python test_server.py
Troubleshooting
Falls was nicht klappt:
Server startet nicht:
- Virtual Environment aktiviert?
source venv/bin/activate - Alle Dependencies installiert?
pip list - Config-Dateien vorhanden?
ls config/
Keine Verbindung zur Datenbank:
- MariaDB läuft?
sudo systemctl status mariadb - Credentials richtig? Check credentials.json
- Benutzer existiert?
mysql -u content_reader -p
Claude findet Server nicht:
- mcp.json am richtigen Ort?
~/.claude/mcp.json - Pfade absolut angegeben?
- Claude neu gestartet nach Config-Änderung?
Performance-Optimierungen
Für Production-Einsatz:
# Connection Pooling hinzufügen
from pymysql import connections
class ConnectionPool:
def __init__(self, size=5):
self.pool = []
self.size = size
def get_connection(self):
if self.pool:
return self.pool.pop()
else:
return self._create_connection()
def return_connection(self, conn):
if len(self.pool) < self.size:
self.pool.append(conn)
else:
conn.close()
# Async Support
import aiomysql
async def async_query(query, params):
async with aiomysql.connect(
host='localhost',
port=3306,
user='content_reader',
password='xxx',
db='karlkratz_de'
) as conn:
async with conn.cursor() as cursor:
await cursor.execute(query, params)
return await cursor.fetchall()
Was haben wir gebaut?
Dein MCP Server kann jetzt:
- Alle CRUD-Operationen auf content_metadata ausführen
- Volle Security-Validierung bei jeder Operation
- Rate Limiting gegen Überlastung
- Strukturiertes Logging aller Aktivitäten
- Berechtigungsprüfung auf User-Ebene
- Prepared Statements gegen SQL-Injection
- Performance-Monitoring integriert
- Claude Code Integration fertig
Der Server ist jetzt bereit für den Einsatz. Claude kann damit sicher auf Deine Datenbank zugreifen, ohne dass Du Dir Sorgen um Sicherheit machen musst.
Grundregel: Es klappt NIE beim ersten Mal. Aber mit den Tests und Logs findest Du jeden Fehler.
Jetzt wo Dein Server läuft, solltest Du ihn gründlich testen. Im nächsten Kapitel zeige ich Dir, wie Du mit strukturierten Tests sicherstellst, dass alles funktioniert. Von Unit Tests über Integration Tests bis zu Performance-Messungen - damit Du ruhig schlafen kannst. Weiter zum systematischen Testing und Debugging