MCP Server implementieren: Alles zusammenfügen

Jetzt kommt der spannende Teil: Wir fügen alle Module zusammen und bauen den eigentlichen MCP Server mit FastMCP.

Der Hauptserver

Hier ist das Herzstück - die server.py:

#!/usr/bin/env python3
# server.py - MCP Content Metadata Server

import os
import sys
import logging
from typing import Dict, Any, Optional, List
from fastmcp import FastMCP

# Module importieren
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from modules.database import DatabaseManager
from modules.security import SecurityValidator
from modules.logging import LogManager

# Server initialisieren
mcp = FastMCP("Content Metadata Server")

# Globale Manager
config_dir = os.environ.get('DATENBANK_CONFIG_DIR', './config')
log_manager = LogManager(config_dir)
security = SecurityValidator(config_dir)
db_manager = None  # Wird bei Bedarf initialisiert

# Logger setup
logger = log_manager.get_logger('server')
logger.info("MCP Server startet...")

def get_db_connection():
    """Lazy-Loading für Datenbankverbindung"""
    global db_manager
    if db_manager is None:
        db_manager = DatabaseManager(config_dir)
        db_manager.connect()
    return db_manager

Tool-Definitionen

Jede Operation wird als Tool registriert:

@mcp.tool()
async def list_tables() -> Dict[str, Any]:
    """
    Listet alle verfügbaren Tabellen der Datenbank auf.
    Nützlich um zu sehen, welche Tabellen verfügbar sind.
    """
    try:
        db = get_db_connection()
        success, result = db.list_tables()

        if success:
            return {
                "success": True,
                "tables": result,
                "count": len(result)
            }
        else:
            return {
                "success": False,
                "error": result
            }
    except Exception as e:
        logger.error(f"list_tables error: {e}")
        return {
            "success": False,
            "error": str(e)
        }

@mcp.tool()
async def describe_table(table: str = None) -> Dict[str, Any]:
    """
    Zeigt die Struktur einer Tabelle (Spalten, Typen, etc).

    Args:
        table: Tabellenname (optional, default: content_metadata)
    """
    try:
        db = get_db_connection()

        # Security Check
        if table and not security.validate_table_name(table):
            return {
                "success": False,
                "error": "Ungültiger Tabellenname"
            }

        success, result = db.describe_table(table)

        if success:
            return {
                "success": True,
                "table": table or "content_metadata",
                "structure": result
            }
        else:
            return {
                "success": False,
                "error": result
            }
    except Exception as e:
        logger.error(f"describe_table error: {e}")
        return {
            "success": False,
            "error": str(e)
        }

SELECT-Tool

Das wichtigste Tool - Daten abfragen:

@mcp.tool()
async def select_data(
    columns: Optional[List[str]] = None,
    where: Optional[str] = None,
    params: Optional[List] = None,
    order_by: Optional[str] = None,
    limit: Optional[int] = None
) -> Dict[str, Any]:
    """
    Führt eine SELECT-Query auf content_metadata aus.

    Args:
        columns: Liste der Spalten die zurückgegeben werden sollen
        where: WHERE-Klausel (ohne WHERE keyword)
        params: Parameter für prepared statements
        order_by: ORDER BY Klausel (z.B. "created_at DESC")
        limit: Maximale Anzahl Ergebnisse (max 100)

    Example:
        select_data(
            columns=["id", "title", "status"],
            where="status = ?",
            params=["published"],
            limit=10
        )
    """
    try:
        db = get_db_connection()

        # Rate Limiting
        user = "default"  # Könnte aus Context kommen
        rate_ok, rate_msg = security.check_rate_limit(user)
        if not rate_ok:
            return {
                "success": False,
                "error": rate_msg
            }

        # Query ausführen
        success, result = db.select(
            columns=columns,
            where=where,
            params=params,
            order_by=order_by,
            limit=limit
        )

        # Logging
        log_manager.log_query(
            query_type="SELECT",
            query=f"SELECT {columns or '*'} FROM content_metadata",
            params=params,
            affected_rows=len(result) if success else None,
            error=result if not success else None
        )

        if success:
            return {
                "success": True,
                "data": result,
                "count": len(result)
            }
        else:
            return {
                "success": False,
                "error": result
            }

    except Exception as e:
        logger.error(f"select_data error: {e}")
        return {
            "success": False,
            "error": str(e)
        }

INSERT-Tool

Neue Datensätze hinzufügen:

@mcp.tool()
async def insert_data(data: Dict[str, Any]) -> Dict[str, Any]:
    """
    Fügt einen neuen Datensatz in content_metadata ein.

    Args:
        data: Dictionary mit Spalten und Werten

    Example:
        insert_data({
            "title": "Neue Seite",
            "meta_description": "Beschreibung",
            "status": "draft"
        })
    """
    try:
        db = get_db_connection()

        # Pflichtfelder prüfen
        if not data.get('title'):
            return {
                "success": False,
                "error": "Title ist ein Pflichtfeld"
            }

        # Defaults setzen
        if 'status' not in data:
            data['status'] = 'draft'
        if 'template' not in data:
            data['template'] = '100-template'

        # User-Type für Berechtigung
        user_type = 'content_writer'  # Könnte aus Context kommen

        # Berechtigung prüfen
        if not security.check_operation_permission(
            user_type, 'INSERT', 'karlkratz_de'
        ):
            return {
                "success": False,
                "error": "Keine Berechtigung für INSERT"
            }

        # Datenbank-Operation
        db.connect(user_type)
        success, result = db.insert(data)

        # Logging
        log_manager.log_query(
            query_type="INSERT",
            query="INSERT INTO content_metadata",
            affected_rows=result.get('affected_rows') if success else None,
            error=result if not success else None
        )

        if success:
            return {
                "success": True,
                "insert_id": result['insert_id'],
                "message": f"Datensatz mit ID {result['insert_id']} eingefügt"
            }
        else:
            return {
                "success": False,
                "error": result
            }

    except Exception as e:
        logger.error(f"insert_data error: {e}")
        db.rollback_transaction()
        return {
            "success": False,
            "error": str(e)
        }

UPDATE-Tool

Bestehende Daten ändern:

@mcp.tool()
async def update_data(
    data: Dict[str, Any],
    where: str,
    params: Optional[List] = None
) -> Dict[str, Any]:
    """
    Aktualisiert Datensätze in content_metadata.

    Args:
        data: Dictionary mit zu aktualisierenden Spalten
        where: WHERE-Klausel (ohne WHERE keyword)
        params: Parameter für WHERE-Klausel

    Example:
        update_data(
            data={"status": "published"},
            where="id = ?",
            params=[42]
        )
    """
    try:
        db = get_db_connection()

        # Berechtigung prüfen
        user_type = 'content_writer'
        if not security.check_operation_permission(
            user_type, 'UPDATE', 'karlkratz_de'
        ):
            return {
                "success": False,
                "error": "Keine Berechtigung für UPDATE"
            }

        # Timestamp aktualisieren
        data['updated_at'] = 'CURRENT_TIMESTAMP'

        # Datenbank-Operation
        db.connect(user_type)
        success, result = db.update(data, where, params)

        # Logging
        log_manager.log_query(
            query_type="UPDATE",
            query="UPDATE content_metadata",
            params=params,
            affected_rows=result.get('affected_rows') if success else None,
            error=result if not success else None
        )

        if success:
            return {
                "success": True,
                "affected_rows": result['affected_rows'],
                "message": f"{result['affected_rows']} Datensätze aktualisiert"
            }
        else:
            return {
                "success": False,
                "error": result
            }

    except Exception as e:
        logger.error(f"update_data error: {e}")
        db.rollback_transaction()
        return {
            "success": False,
            "error": str(e)
        }

DELETE-Tool

Datensätze löschen (mit Vorsicht!):

@mcp.tool()
async def delete_data(
    where: str,
    params: Optional[List] = None
) -> Dict[str, Any]:
    """
    Löscht Datensätze aus content_metadata.
    VORSICHT: Diese Operation kann nicht rückgängig gemacht werden!

    Args:
        where: WHERE-Klausel (ohne WHERE keyword)
        params: Parameter für WHERE-Klausel

    Example:
        delete_data(
            where="status = ? AND created_at < ?",
            params=["draft", "2024-01-01"]
        )
    """
    try:
        db = get_db_connection()

        # Extra Sicherheitsabfrage
        if not where or where.strip().lower() in ['1=1', 'true']:
            return {
                "success": False,
                "error": "Unsichere WHERE-Klausel"
            }

        # Berechtigung prüfen
        user_type = 'content_writer'
        if not security.check_operation_permission(
            user_type, 'DELETE', 'karlkratz_de'
        ):
            return {
                "success": False,
                "error": "Keine Berechtigung für DELETE"
            }

        # Datenbank-Operation
        db.connect(user_type)
        success, result = db.delete(where, params)

        # Logging (besonders wichtig bei DELETE!)
        log_manager.log_query(
            query_type="DELETE",
            query="DELETE FROM content_metadata",
            params=params,
            affected_rows=result.get('affected_rows') if success else None,
            error=result if not success else None
        )

        if success:
            # Security Event bei vielen gelöschten Zeilen
            if result['affected_rows'] > 5:
                log_manager.log_security_event(
                    event_type='MASS_DELETE',
                    details={
                        'affected_rows': result['affected_rows'],
                        'where': where
                    },
                    severity='HIGH'
                )

            return {
                "success": True,
                "affected_rows": result['affected_rows'],
                "message": f"{result['affected_rows']} Datensätze gelöscht"
            }
        else:
            return {
                "success": False,
                "error": result
            }

    except Exception as e:
        logger.error(f"delete_data error: {e}")
        db.rollback_transaction()
        return {
            "success": False,
            "error": str(e)
        }

Utility-Tools

Zusätzliche nützliche Tools:

@mcp.tool()
async def count_records(
    where: Optional[str] = None,
    params: Optional[List] = None
) -> Dict[str, Any]:
    """
    Zählt Datensätze in content_metadata.

    Args:
        where: Optionale WHERE-Klausel
        params: Parameter für WHERE-Klausel
    """
    try:
        db = get_db_connection()
        success, count = db.count_records(where, params)

        if success:
            return {
                "success": True,
                "count": count
            }
        else:
            return {
                "success": False,
                "error": count
            }
    except Exception as e:
        return {
            "success": False,
            "error": str(e)
        }

@mcp.tool()
async def get_log_stats(
    start_time: Optional[str] = None,
    end_time: Optional[str] = None
) -> Dict[str, Any]:
    """
    Gibt Statistiken über die Server-Nutzung zurück.

    Args:
        start_time: Start-Zeitpunkt (ISO format)
        end_time: End-Zeitpunkt (ISO format)
    """
    try:
        from datetime import datetime

        start = datetime.fromisoformat(start_time) if start_time else None
        end = datetime.fromisoformat(end_time) if end_time else None

        stats = log_manager.analyze_logs(
            start_time=start,
            end_time=end
        )

        return {
            "success": True,
            "stats": stats
        }
    except Exception as e:
        return {
            "success": False,
            "error": str(e)
        }

Server starten

Der finale Teil - Server hochfahren:

if __name__ == "__main__":
    import asyncio

    # Server Info ausgeben
    print(f"""
    ╔════════════════════════════════════════╗
    ║   Content Metadata MCP Server v1.0     ║
    ╠════════════════════════════════════════╣
    ║   Config Dir: {config_dir:<25} ║
    ║   Target Table: content_metadata       ║
    ║   Security: Enabled                    ║
    ║   Logging: Enabled                     ║
    ╚════════════════════════════════════════╝
    """)

    # Server starten
    try:
        logger.info("Server bereit für Verbindungen")
        asyncio.run(mcp.run())
    except KeyboardInterrupt:
        logger.info("Server wird heruntergefahren...")
        if db_manager:
            db_manager.disconnect()
        print("\nServer beendet.")
    except Exception as e:
        logger.error(f"Server-Fehler: {e}")
        raise

Claude Code Integration

Damit Claude den Server findet, brauchst Du diese Konfiguration:

# ~/.claude/mcp.json
{
  "mcpServers": {
    "content-metadata": {
      "command": "/var/deinverzeichnis/mcp/content-metadata-server/venv/bin/python",
      "args": ["/var/deinverzeichnis/mcp/content-metadata-server/server.py"],
      "env": {
        "DATENBANK_CONFIG_DIR": "/var/deinverzeichnis/mcp/content-metadata-server/config"
      }
    }
  }
}

Server testen

Teste ob alles funktioniert:

# test_server.py
#!/usr/bin/env python3
import asyncio
import sys
sys.path.append('.')

# Server-Module importieren
from server import (
    list_tables,
    describe_table,
    select_data,
    count_records
)

async def test_server():
    """Testet die Server-Funktionen"""

    print("=== MCP Server Test ===\n")

    # Test 1: Tabellen auflisten
    print("1. Tabellen auflisten:")
    result = await list_tables()
    if result['success']:
        print(f"   Gefunden: {result['count']} Tabellen")
    else:
        print(f"   FEHLER: {result['error']}")

    # Test 2: Tabellenstruktur
    print("\n2. Tabellenstruktur:")
    result = await describe_table()
    if result['success']:
        print(f"   Spalten: {len(result['structure'])}")
    else:
        print(f"   FEHLER: {result['error']}")

    # Test 3: Daten abfragen
    print("\n3. Daten abfragen:")
    result = await select_data(
        columns=["id", "title", "status"],
        where="status = ?",
        params=["published"],
        limit=5
    )
    if result['success']:
        print(f"   Gefunden: {result['count']} Datensätze")
        for row in result['data']:
            print(f"   - [{row['id']}] {row['title'][:50]}...")
    else:
        print(f"   FEHLER: {result['error']}")

    # Test 4: Anzahl zählen
    print("\n4. Datensätze zählen:")
    result = await count_records()
    if result['success']:
        print(f"   Gesamt: {result['count']} Datensätze")
    else:
        print(f"   FEHLER: {result['error']}")

    print("\n=== Test abgeschlossen ===")

if __name__ == "__main__":
    asyncio.run(test_server())

Führe den Test aus mit: python test_server.py

Troubleshooting

Falls was nicht klappt:

Server startet nicht:

Keine Verbindung zur Datenbank:

Claude findet Server nicht:

Performance-Optimierungen

Für Production-Einsatz:

# Connection Pooling hinzufügen
from pymysql import connections

class ConnectionPool:
    def __init__(self, size=5):
        self.pool = []
        self.size = size

    def get_connection(self):
        if self.pool:
            return self.pool.pop()
        else:
            return self._create_connection()

    def return_connection(self, conn):
        if len(self.pool) < self.size:
            self.pool.append(conn)
        else:
            conn.close()

# Async Support
import aiomysql

async def async_query(query, params):
    async with aiomysql.connect(
        host='localhost',
        port=3306,
        user='content_reader',
        password='xxx',
        db='karlkratz_de'
    ) as conn:
        async with conn.cursor() as cursor:
            await cursor.execute(query, params)
            return await cursor.fetchall()

Was haben wir gebaut?

Dein MCP Server kann jetzt:

Der Server ist jetzt bereit für den Einsatz. Claude kann damit sicher auf Deine Datenbank zugreifen, ohne dass Du Dir Sorgen um Sicherheit machen musst.

Grundregel: Es klappt NIE beim ersten Mal. Aber mit den Tests und Logs findest Du jeden Fehler.

Jetzt wo Dein Server läuft, solltest Du ihn gründlich testen. Im nächsten Kapitel zeige ich Dir, wie Du mit strukturierten Tests sicherstellst, dass alles funktioniert. Von Unit Tests über Integration Tests bis zu Performance-Messungen - damit Du ruhig schlafen kannst. Weiter zum systematischen Testing und Debugging