Database-Modul entwickeln: Das Herzstück

Jetzt bauen wir das Database-Modul. Das ist die zentrale Komponente, die alle Datenbankoperationen durchführt.

Die Basis-Klasse

Wir starten mit der Grundstruktur unseres DatabaseManagers:

# modules/database.py
import pymysql
import json
import os
import logging
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime
from modules.security import SecurityValidator

class DatabaseManager:
    """Verwaltet alle Datenbankoperationen mit Sicherheit und Logging"""

    def __init__(self, config_dir: str):
        self.config_dir = config_dir
        self.config = self._load_config()
        self.credentials = self._load_credentials()
        self.security = SecurityValidator(config_dir)
        self.connection = None
        self.logger = logging.getLogger('database')

    def _load_config(self) -> Dict:
        """Lädt die Datenbank-Konfiguration"""
        config_path = os.path.join(self.config_dir, 'database.json')
        with open(config_path, 'r') as f:
            return json.load(f)

    def _load_credentials(self) -> Dict:
        """Lädt die Zugangsdaten"""
        cred_path = os.path.join(self.config_dir, 'credentials.json')
        with open(cred_path, 'r') as f:
            return json.load(f)

Connection Management

Verbindungen sind teuer. Wir verwenden sie wieder:

    def connect(self, user_type: str = None) -> pymysql.Connection:
        """Stellt Verbindung zur Datenbank her"""

        # Default User wenn nicht angegeben
        if not user_type:
            user_type = self.credentials.get('default_user', 'content_reader')

        # User-Credentials holen
        user_config = self.credentials['database_users'][user_type]

        # Verbindungsparameter
        connection_config = self.config['connections']['karlkratz_de']

        try:
            self.connection = pymysql.connect(
                host=connection_config['host'],
                port=connection_config['port'],
                database=connection_config['database'],
                user=user_type,
                password=user_config['password'],
                charset=connection_config['charset'],
                connect_timeout=connection_config['connect_timeout'],
                read_timeout=connection_config['read_timeout'],
                autocommit=False  # Explizite Transaktionen
            )

            self.logger.info(f"Verbindung hergestellt als {user_type}")
            return self.connection

        except pymysql.Error as e:
            self.logger.error(f"Verbindungsfehler: {e}")
            raise

    def disconnect(self):
        """Trennt die Datenbankverbindung"""
        if self.connection:
            try:
                self.connection.close()
                self.logger.info("Verbindung getrennt")
            except:
                pass
            finally:
                self.connection = None

SELECT-Operationen

Die wichtigste Operation: Daten lesen.

    def select(self,
               columns: List[str] = None,
               where: str = None,
               params: List = None,
               order_by: str = None,
               limit: int = None) -> Tuple[bool, Any]:
        """Führt SELECT-Query aus mit voller Validierung"""

        # Columns validieren
        if columns:
            if not self.security.validate_column_names(columns):
                return False, "Ungültige Spaltennamen"
            column_str = ', '.join(columns)
        else:
            column_str = '*'

        # Query bauen
        table = self.config['target_table']
        query = f"SELECT {column_str} FROM {table}"

        # WHERE-Klausel
        if where:
            valid, msg = self.security.validate_where_clause(where)
            if not valid:
                return False, msg
            query += f" WHERE {where}"

        # ORDER BY
        if order_by:
            # Nur alphanumerisch und underscore erlaubt
            import re
            if re.match(r'^[a-zA-Z0-9_]+ (ASC|DESC)$', order_by):
                query += f" ORDER BY {order_by}"

        # LIMIT
        if limit:
            if not isinstance(limit, int) or limit < 1 or limit > 100:
                return False, "Limit muss zwischen 1 und 100 sein"
            query += f" LIMIT {limit}"

        # Query validieren
        valid, msg = self.security.validate_query(query, params)
        if not valid:
            return False, msg

        # Ausführen
        try:
            with self.connection.cursor(pymysql.cursors.DictCursor) as cursor:
                cursor.execute(query, params or [])
                results = cursor.fetchall()

                self.logger.info(
                    f"SELECT ausgeführt: {len(results)} Zeilen"
                )

                return True, results

        except pymysql.Error as e:
            self.logger.error(f"SELECT-Fehler: {e}")
            return False, str(e)

Der DictCursor gibt die Ergebnisse als Dictionary zurück. Das macht die Weiterverarbeitung viel einfacher.

INSERT-Operationen

Neue Daten hinzufügen:

    def insert(self, data: Dict[str, Any]) -> Tuple[bool, Any]:
        """Fügt neuen Datensatz ein"""

        if not data:
            return False, "Keine Daten zum Einfügen"

        # Spalten validieren
        columns = list(data.keys())
        if not self.security.validate_column_names(columns):
            return False, "Ungültige Spaltennamen"

        # Query bauen
        table = self.config['target_table']
        placeholders = ', '.join(['%s'] * len(data))
        column_str = ', '.join(columns)

        query = f"INSERT INTO {table} ({column_str}) VALUES ({placeholders})"
        values = list(data.values())

        # Query validieren
        valid, msg = self.security.validate_query(query, values)
        if not valid:
            return False, msg

        # Ausführen
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(query, values)
                self.connection.commit()

                insert_id = cursor.lastrowid
                affected_rows = cursor.rowcount

                self.logger.info(
                    f"INSERT ausgeführt: ID {insert_id}, "
                    f"{affected_rows} Zeile eingefügt"
                )

                return True, {
                    'insert_id': insert_id,
                    'affected_rows': affected_rows
                }

        except pymysql.Error as e:
            self.connection.rollback()
            self.logger.error(f"INSERT-Fehler: {e}")
            return False, str(e)

UPDATE-Operationen

Bestehende Daten ändern, aber mit Vorsicht:

    def update(self,
               data: Dict[str, Any],
               where: str,
               params: List = None) -> Tuple[bool, Any]:
        """Aktualisiert Datensätze"""

        if not data:
            return False, "Keine Daten zum Aktualisieren"

        if not where:
            return False, "WHERE-Klausel fehlt (kein UPDATE ohne WHERE!)"

        # WHERE validieren
        valid, msg = self.security.validate_where_clause(where)
        if not valid:
            return False, msg

        # Spalten validieren
        columns = list(data.keys())
        if not self.security.validate_column_names(columns):
            return False, "Ungültige Spaltennamen"

        # SET-Teil bauen
        set_parts = [f"{col} = %s" for col in columns]
        set_clause = ', '.join(set_parts)

        # Query bauen
        table = self.config['target_table']
        query = f"UPDATE {table} SET {set_clause} WHERE {where}"

        # Values kombinieren
        values = list(data.values())
        if params:
            values.extend(params)

        # Query validieren
        valid, msg = self.security.validate_query(query, values)
        if not valid:
            return False, msg

        # Ausführen
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(query, values)

                affected_rows = cursor.rowcount

                # Prüfen ob zu viele Zeilen betroffen
                max_affected = self.security.security_config[
                    'query_limits']['max_affected_rows_update']

                if affected_rows > max_affected:
                    self.connection.rollback()
                    return False, f"Zu viele Zeilen betroffen ({affected_rows} > {max_affected})"

                self.connection.commit()

                self.logger.info(f"UPDATE ausgeführt: {affected_rows} Zeilen")

                return True, {'affected_rows': affected_rows}

        except pymysql.Error as e:
            self.connection.rollback()
            self.logger.error(f"UPDATE-Fehler: {e}")
            return False, str(e)

UPDATE ohne WHERE ist gefährlich. Das verhindern wir grundsätzlich.

DELETE-Operationen

Löschen, aber nur mit WHERE:

    def delete(self, where: str, params: List = None) -> Tuple[bool, Any]:
        """Löscht Datensätze"""

        if not where:
            return False, "WHERE-Klausel fehlt (kein DELETE ohne WHERE!)"

        # WHERE validieren
        valid, msg = self.security.validate_where_clause(where)
        if not valid:
            return False, msg

        # Query bauen
        table = self.config['target_table']
        query = f"DELETE FROM {table} WHERE {where}"

        # Query validieren
        valid, msg = self.security.validate_query(query, params)
        if not valid:
            return False, msg

        # Ausführen
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(query, params or [])

                affected_rows = cursor.rowcount

                # Prüfen ob zu viele Zeilen betroffen
                max_affected = self.security.security_config[
                    'query_limits']['max_affected_rows_delete']

                if affected_rows > max_affected:
                    self.connection.rollback()
                    return False, f"Zu viele Zeilen betroffen ({affected_rows} > {max_affected})"

                self.connection.commit()

                self.logger.info(f"DELETE ausgeführt: {affected_rows} Zeilen")

                return True, {'affected_rows': affected_rows}

        except pymysql.Error as e:
            self.connection.rollback()
            self.logger.error(f"DELETE-Fehler: {e}")
            return False, str(e)

Transaktions-Management

Für komplexe Operationen:

    def begin_transaction(self):
        """Startet eine Transaktion"""
        if self.connection:
            self.connection.begin()
            self.logger.info("Transaktion gestartet")

    def commit_transaction(self):
        """Schließt Transaktion erfolgreich ab"""
        if self.connection:
            self.connection.commit()
            self.logger.info("Transaktion committed")

    def rollback_transaction(self):
        """Macht Transaktion rückgängig"""
        if self.connection:
            self.connection.rollback()
            self.logger.warning("Transaktion zurückgerollt")

Utility-Funktionen

Nützliche Hilfsfunktionen:

    def list_tables(self) -> Tuple[bool, Any]:
        """Listet alle Tabellen der Datenbank"""
        query = "SHOW TABLES"

        try:
            with self.connection.cursor() as cursor:
                cursor.execute(query)
                tables = [row[0] for row in cursor.fetchall()]
                return True, tables
        except pymysql.Error as e:
            return False, str(e)

    def describe_table(self, table: str = None) -> Tuple[bool, Any]:
        """Zeigt Tabellenstruktur"""
        if not table:
            table = self.config['target_table']

        # Tabellenname validieren
        if not self.security.validate_table_name(table):
            return False, "Ungültiger Tabellenname"

        query = f"DESCRIBE {table}"

        try:
            with self.connection.cursor(pymysql.cursors.DictCursor) as cursor:
                cursor.execute(query)
                structure = cursor.fetchall()
                return True, structure
        except pymysql.Error as e:
            return False, str(e)

    def count_records(self, where: str = None, params: List = None) -> Tuple[bool, Any]:
        """Zählt Datensätze"""
        table = self.config['target_table']
        query = f"SELECT COUNT(*) as count FROM {table}"

        if where:
            valid, msg = self.security.validate_where_clause(where)
            if not valid:
                return False, msg
            query += f" WHERE {where}"

        try:
            with self.connection.cursor(pymysql.cursors.DictCursor) as cursor:
                cursor.execute(query, params or [])
                result = cursor.fetchone()
                return True, result['count']
        except pymysql.Error as e:
            return False, str(e)

Database-Modul testen

Ein kleiner Test, ob alles funktioniert:

# test_database.py
#!/usr/bin/env python3
import sys
sys.path.append('.')
from modules.database import DatabaseManager

def test_database():
    """Testet das Database-Modul"""

    db = DatabaseManager('./config')

    try:
        # Verbindung aufbauen
        db.connect('content_reader')
        print("OK: Verbindung hergestellt")

        # SELECT Test
        success, result = db.select(
            columns=['id', 'title'],
            limit=5
        )
        if success:
            print(f"OK: SELECT - {len(result)} Zeilen")
        else:
            print(f"FEHLER: SELECT - {result}")

        # COUNT Test
        success, count = db.count_records()
        if success:
            print(f"OK: COUNT - {count} Datensätze")

        # Tabellenstruktur
        success, structure = db.describe_table()
        if success:
            print(f"OK: DESCRIBE - {len(structure)} Spalten")

    finally:
        db.disconnect()

    print("\nDatabase-Modul Tests bestanden!")

if __name__ == "__main__":
    test_database()

Was haben wir gebaut?

Unser Database-Modul kann jetzt:

Jetzt haben wir eine solide Datenbank-Schicht. Als nächstes kümmern wir uns um das Logging, denn ohne ordentliche Logs bist Du blind wenn was schiefgeht. Ich zeige Dir, wie Du strukturierte Logs aufbaust, Performance-Metriken sammelst und bei kritischen Events automatisch Alerts bekommst. Weiter zum Logging-System mit strukturierten Logs