Database-Modul entwickeln: Das Herzstück
Jetzt bauen wir das Database-Modul. Das ist die zentrale Komponente, die alle Datenbankoperationen durchführt.
Die Basis-Klasse
Wir starten mit der Grundstruktur unseres DatabaseManagers:
# modules/database.py
import pymysql
import json
import os
import logging
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime
from modules.security import SecurityValidator
class DatabaseManager:
"""Verwaltet alle Datenbankoperationen mit Sicherheit und Logging"""
def __init__(self, config_dir: str):
self.config_dir = config_dir
self.config = self._load_config()
self.credentials = self._load_credentials()
self.security = SecurityValidator(config_dir)
self.connection = None
self.logger = logging.getLogger('database')
def _load_config(self) -> Dict:
"""Lädt die Datenbank-Konfiguration"""
config_path = os.path.join(self.config_dir, 'database.json')
with open(config_path, 'r') as f:
return json.load(f)
def _load_credentials(self) -> Dict:
"""Lädt die Zugangsdaten"""
cred_path = os.path.join(self.config_dir, 'credentials.json')
with open(cred_path, 'r') as f:
return json.load(f)
Connection Management
Verbindungen sind teuer. Wir verwenden sie wieder:
def connect(self, user_type: str = None) -> pymysql.Connection:
"""Stellt Verbindung zur Datenbank her"""
# Default User wenn nicht angegeben
if not user_type:
user_type = self.credentials.get('default_user', 'content_reader')
# User-Credentials holen
user_config = self.credentials['database_users'][user_type]
# Verbindungsparameter
connection_config = self.config['connections']['karlkratz_de']
try:
self.connection = pymysql.connect(
host=connection_config['host'],
port=connection_config['port'],
database=connection_config['database'],
user=user_type,
password=user_config['password'],
charset=connection_config['charset'],
connect_timeout=connection_config['connect_timeout'],
read_timeout=connection_config['read_timeout'],
autocommit=False # Explizite Transaktionen
)
self.logger.info(f"Verbindung hergestellt als {user_type}")
return self.connection
except pymysql.Error as e:
self.logger.error(f"Verbindungsfehler: {e}")
raise
def disconnect(self):
"""Trennt die Datenbankverbindung"""
if self.connection:
try:
self.connection.close()
self.logger.info("Verbindung getrennt")
except:
pass
finally:
self.connection = None
SELECT-Operationen
Die wichtigste Operation: Daten lesen.
def select(self,
columns: List[str] = None,
where: str = None,
params: List = None,
order_by: str = None,
limit: int = None) -> Tuple[bool, Any]:
"""Führt SELECT-Query aus mit voller Validierung"""
# Columns validieren
if columns:
if not self.security.validate_column_names(columns):
return False, "Ungültige Spaltennamen"
column_str = ', '.join(columns)
else:
column_str = '*'
# Query bauen
table = self.config['target_table']
query = f"SELECT {column_str} FROM {table}"
# WHERE-Klausel
if where:
valid, msg = self.security.validate_where_clause(where)
if not valid:
return False, msg
query += f" WHERE {where}"
# ORDER BY
if order_by:
# Nur alphanumerisch und underscore erlaubt
import re
if re.match(r'^[a-zA-Z0-9_]+ (ASC|DESC)$', order_by):
query += f" ORDER BY {order_by}"
# LIMIT
if limit:
if not isinstance(limit, int) or limit < 1 or limit > 100:
return False, "Limit muss zwischen 1 und 100 sein"
query += f" LIMIT {limit}"
# Query validieren
valid, msg = self.security.validate_query(query, params)
if not valid:
return False, msg
# Ausführen
try:
with self.connection.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute(query, params or [])
results = cursor.fetchall()
self.logger.info(
f"SELECT ausgeführt: {len(results)} Zeilen"
)
return True, results
except pymysql.Error as e:
self.logger.error(f"SELECT-Fehler: {e}")
return False, str(e)
Der DictCursor gibt die Ergebnisse als Dictionary zurück. Das macht die Weiterverarbeitung viel einfacher.
INSERT-Operationen
Neue Daten hinzufügen:
def insert(self, data: Dict[str, Any]) -> Tuple[bool, Any]:
"""Fügt neuen Datensatz ein"""
if not data:
return False, "Keine Daten zum Einfügen"
# Spalten validieren
columns = list(data.keys())
if not self.security.validate_column_names(columns):
return False, "Ungültige Spaltennamen"
# Query bauen
table = self.config['target_table']
placeholders = ', '.join(['%s'] * len(data))
column_str = ', '.join(columns)
query = f"INSERT INTO {table} ({column_str}) VALUES ({placeholders})"
values = list(data.values())
# Query validieren
valid, msg = self.security.validate_query(query, values)
if not valid:
return False, msg
# Ausführen
try:
with self.connection.cursor() as cursor:
cursor.execute(query, values)
self.connection.commit()
insert_id = cursor.lastrowid
affected_rows = cursor.rowcount
self.logger.info(
f"INSERT ausgeführt: ID {insert_id}, "
f"{affected_rows} Zeile eingefügt"
)
return True, {
'insert_id': insert_id,
'affected_rows': affected_rows
}
except pymysql.Error as e:
self.connection.rollback()
self.logger.error(f"INSERT-Fehler: {e}")
return False, str(e)
UPDATE-Operationen
Bestehende Daten ändern, aber mit Vorsicht:
def update(self,
data: Dict[str, Any],
where: str,
params: List = None) -> Tuple[bool, Any]:
"""Aktualisiert Datensätze"""
if not data:
return False, "Keine Daten zum Aktualisieren"
if not where:
return False, "WHERE-Klausel fehlt (kein UPDATE ohne WHERE!)"
# WHERE validieren
valid, msg = self.security.validate_where_clause(where)
if not valid:
return False, msg
# Spalten validieren
columns = list(data.keys())
if not self.security.validate_column_names(columns):
return False, "Ungültige Spaltennamen"
# SET-Teil bauen
set_parts = [f"{col} = %s" for col in columns]
set_clause = ', '.join(set_parts)
# Query bauen
table = self.config['target_table']
query = f"UPDATE {table} SET {set_clause} WHERE {where}"
# Values kombinieren
values = list(data.values())
if params:
values.extend(params)
# Query validieren
valid, msg = self.security.validate_query(query, values)
if not valid:
return False, msg
# Ausführen
try:
with self.connection.cursor() as cursor:
cursor.execute(query, values)
affected_rows = cursor.rowcount
# Prüfen ob zu viele Zeilen betroffen
max_affected = self.security.security_config[
'query_limits']['max_affected_rows_update']
if affected_rows > max_affected:
self.connection.rollback()
return False, f"Zu viele Zeilen betroffen ({affected_rows} > {max_affected})"
self.connection.commit()
self.logger.info(f"UPDATE ausgeführt: {affected_rows} Zeilen")
return True, {'affected_rows': affected_rows}
except pymysql.Error as e:
self.connection.rollback()
self.logger.error(f"UPDATE-Fehler: {e}")
return False, str(e)
UPDATE ohne WHERE ist gefährlich. Das verhindern wir grundsätzlich.
DELETE-Operationen
Löschen, aber nur mit WHERE:
def delete(self, where: str, params: List = None) -> Tuple[bool, Any]:
"""Löscht Datensätze"""
if not where:
return False, "WHERE-Klausel fehlt (kein DELETE ohne WHERE!)"
# WHERE validieren
valid, msg = self.security.validate_where_clause(where)
if not valid:
return False, msg
# Query bauen
table = self.config['target_table']
query = f"DELETE FROM {table} WHERE {where}"
# Query validieren
valid, msg = self.security.validate_query(query, params)
if not valid:
return False, msg
# Ausführen
try:
with self.connection.cursor() as cursor:
cursor.execute(query, params or [])
affected_rows = cursor.rowcount
# Prüfen ob zu viele Zeilen betroffen
max_affected = self.security.security_config[
'query_limits']['max_affected_rows_delete']
if affected_rows > max_affected:
self.connection.rollback()
return False, f"Zu viele Zeilen betroffen ({affected_rows} > {max_affected})"
self.connection.commit()
self.logger.info(f"DELETE ausgeführt: {affected_rows} Zeilen")
return True, {'affected_rows': affected_rows}
except pymysql.Error as e:
self.connection.rollback()
self.logger.error(f"DELETE-Fehler: {e}")
return False, str(e)
Transaktions-Management
Für komplexe Operationen:
def begin_transaction(self):
"""Startet eine Transaktion"""
if self.connection:
self.connection.begin()
self.logger.info("Transaktion gestartet")
def commit_transaction(self):
"""Schließt Transaktion erfolgreich ab"""
if self.connection:
self.connection.commit()
self.logger.info("Transaktion committed")
def rollback_transaction(self):
"""Macht Transaktion rückgängig"""
if self.connection:
self.connection.rollback()
self.logger.warning("Transaktion zurückgerollt")
Utility-Funktionen
Nützliche Hilfsfunktionen:
def list_tables(self) -> Tuple[bool, Any]:
"""Listet alle Tabellen der Datenbank"""
query = "SHOW TABLES"
try:
with self.connection.cursor() as cursor:
cursor.execute(query)
tables = [row[0] for row in cursor.fetchall()]
return True, tables
except pymysql.Error as e:
return False, str(e)
def describe_table(self, table: str = None) -> Tuple[bool, Any]:
"""Zeigt Tabellenstruktur"""
if not table:
table = self.config['target_table']
# Tabellenname validieren
if not self.security.validate_table_name(table):
return False, "Ungültiger Tabellenname"
query = f"DESCRIBE {table}"
try:
with self.connection.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute(query)
structure = cursor.fetchall()
return True, structure
except pymysql.Error as e:
return False, str(e)
def count_records(self, where: str = None, params: List = None) -> Tuple[bool, Any]:
"""Zählt Datensätze"""
table = self.config['target_table']
query = f"SELECT COUNT(*) as count FROM {table}"
if where:
valid, msg = self.security.validate_where_clause(where)
if not valid:
return False, msg
query += f" WHERE {where}"
try:
with self.connection.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute(query, params or [])
result = cursor.fetchone()
return True, result['count']
except pymysql.Error as e:
return False, str(e)
Database-Modul testen
Ein kleiner Test, ob alles funktioniert:
# test_database.py
#!/usr/bin/env python3
import sys
sys.path.append('.')
from modules.database import DatabaseManager
def test_database():
"""Testet das Database-Modul"""
db = DatabaseManager('./config')
try:
# Verbindung aufbauen
db.connect('content_reader')
print("OK: Verbindung hergestellt")
# SELECT Test
success, result = db.select(
columns=['id', 'title'],
limit=5
)
if success:
print(f"OK: SELECT - {len(result)} Zeilen")
else:
print(f"FEHLER: SELECT - {result}")
# COUNT Test
success, count = db.count_records()
if success:
print(f"OK: COUNT - {count} Datensätze")
# Tabellenstruktur
success, structure = db.describe_table()
if success:
print(f"OK: DESCRIBE - {len(structure)} Spalten")
finally:
db.disconnect()
print("\nDatabase-Modul Tests bestanden!")
if __name__ == "__main__":
test_database()
Was haben wir gebaut?
Unser Database-Modul kann jetzt:
- Sichere Verbindungen mit verschiedenen Benutzern aufbauen
- SELECT-Queries mit voller Validierung ausführen
- INSERT von neuen Datensätzen mit Parameter Binding
- UPDATE mit WHERE-Klausel und Limits
- DELETE mit Sicherheitschecks
- Transaktions-Management für komplexe Operationen
- Utility-Funktionen für Metadaten
Jetzt haben wir eine solide Datenbank-Schicht. Als nächstes kümmern wir uns um das Logging, denn ohne ordentliche Logs bist Du blind wenn was schiefgeht. Ich zeige Dir, wie Du strukturierte Logs aufbaust, Performance-Metriken sammelst und bei kritischen Events automatisch Alerts bekommst. Weiter zum Logging-System mit strukturierten Logs