MD2WordPress/wordpress_api.py
Jörg Lohrer 9ba1aa7b10 Bugfix: Tag-Duplikate, Post-Duplikate und Veröffentlichungsdatum
Fixes:
- Tag/Kategorie-Erstellung: Bessere Fehlerbehandlung für bereits existierende Tags
- Post-Duplikatsprüfung: Verbesserte Suche mit status='any' und case-insensitive Vergleich
- Veröffentlichungsdatum: datePublished aus Frontmatter wird als WordPress-Datum gesetzt
- Erweiterte Datumsextraktion aus verschiedenen Frontmatter-Strukturen

Neue Datei:
- USAGE_MODES.md: Übersicht der drei Verwendungsmodi
2025-10-01 08:30:07 +02:00

324 lines
12 KiB
Python

#!/usr/bin/env python3
"""
WordPress API Helper
Bietet Funktionen zum Erstellen von WordPress-Beiträgen und Hochladen von Medien
über die WordPress REST-API mit Duplikatsprüfung.
"""
import os
import requests
import hashlib
from typing import Optional, Dict, Any, List
from urllib.parse import urljoin
import mimetypes
class WordPressAPI:
"""WordPress REST-API Client mit Duplikatsprüfung"""
def __init__(self, url: str, username: str, app_password: str):
"""
Initialisiert den WordPress API Client
Args:
url: WordPress URL (z.B. https://news.rpi-virtuell.de)
username: WordPress Benutzername
app_password: WordPress Anwendungspasswort
"""
self.url = url.rstrip('/')
self.api_base = urljoin(self.url, '/wp-json/wp/v2/')
self.auth = (username, app_password.replace(' ', ''))
self.session = requests.Session()
self.session.auth = self.auth
def _get(self, endpoint: str, params: Optional[Dict] = None) -> requests.Response:
"""GET-Request an WordPress API"""
url = urljoin(self.api_base, endpoint)
response = self.session.get(url, params=params)
response.raise_for_status()
return response
def _post(self, endpoint: str, data: Optional[Dict] = None,
files: Optional[Dict] = None, headers: Optional[Dict] = None) -> requests.Response:
"""POST-Request an WordPress API"""
url = urljoin(self.api_base, endpoint)
response = self.session.post(url, json=data, files=files, headers=headers)
response.raise_for_status()
return response
def check_post_exists(self, title: str) -> Optional[int]:
"""
Prüft, ob ein Beitrag mit dem Titel bereits existiert
Args:
title: Titel des Beitrags
Returns:
Post-ID wenn gefunden, sonst None
"""
try:
# Suche mit verschiedenen Parametern
response = self._get('posts', params={
'search': title,
'per_page': 100, # Erhöht für bessere Suche
'status': 'any' # Alle Status (draft, publish, etc.)
})
posts = response.json()
# Normalisiere Titel für Vergleich
title_lower = title.lower().strip()
# Exakte Übereinstimmung prüfen
for post in posts:
# Prüfe rendered Titel
rendered_title = post.get('title', {}).get('rendered', '').strip()
if rendered_title.lower() == title_lower:
print(f" → Beitrag gefunden (ID: {post['id']}, Status: {post['status']})")
return post['id']
# Prüfe auch raw Titel falls vorhanden
raw_title = post.get('title', {}).get('raw', '').strip()
if raw_title and raw_title.lower() == title_lower:
print(f" → Beitrag gefunden (ID: {post['id']}, Status: {post['status']})")
return post['id']
return None
except requests.exceptions.RequestException as e:
print(f"Fehler bei der Suche nach Beitrag: {e}")
return None
def check_media_exists(self, filename: str, file_hash: Optional[str] = None) -> Optional[int]:
"""
Prüft, ob eine Mediendatei bereits existiert
Args:
filename: Dateiname
file_hash: Optional MD5-Hash der Datei für präzisere Prüfung
Returns:
Media-ID wenn gefunden, sonst None
"""
try:
# Suche nach Dateiname
response = self._get('media', params={'search': filename, 'per_page': 10})
media_items = response.json()
for item in media_items:
source_url = item.get('source_url', '')
if filename in source_url:
# Wenn Hash gegeben, zusätzlich prüfen
if file_hash:
# Hash kann nicht direkt verglichen werden,
# daher nur Dateiname-Check
return item['id']
return item['id']
return None
except requests.exceptions.RequestException as e:
print(f"Fehler bei der Suche nach Medien: {e}")
return None
def upload_media(self, file_path: str, title: Optional[str] = None,
alt_text: Optional[str] = None,
check_duplicate: bool = True) -> Optional[int]:
"""
Lädt eine Mediendatei zu WordPress hoch
Args:
file_path: Pfad zur Datei
title: Titel der Mediendatei (optional)
alt_text: Alt-Text für Bilder (optional)
check_duplicate: Prüfung auf Duplikate (Standard: True)
Returns:
Media-ID der hochgeladenen Datei, oder None bei Fehler
"""
if not os.path.exists(file_path):
print(f"Datei nicht gefunden: {file_path}")
return None
filename = os.path.basename(file_path)
# Duplikatsprüfung
if check_duplicate:
existing_id = self.check_media_exists(filename)
if existing_id:
print(f"Medien-Datei '{filename}' existiert bereits (ID: {existing_id})")
return existing_id
# MIME-Type ermitteln
mime_type, _ = mimetypes.guess_type(file_path)
if not mime_type:
mime_type = 'application/octet-stream'
# Datei hochladen
try:
with open(file_path, 'rb') as f:
files = {
'file': (filename, f, mime_type)
}
headers = {
'Content-Disposition': f'attachment; filename="{filename}"'
}
if title:
headers['Content-Title'] = title
if alt_text:
headers['Content-Alt-Text'] = alt_text
url = urljoin(self.api_base, 'media')
response = self.session.post(url, files=files, headers=headers)
response.raise_for_status()
media_data = response.json()
media_id = media_data['id']
print(f"Medien-Datei '{filename}' hochgeladen (ID: {media_id})")
return media_id
except requests.exceptions.RequestException as e:
print(f"Fehler beim Hochladen der Medien-Datei: {e}")
return None
def create_post(self, title: str, content: str,
status: str = 'draft',
featured_media: Optional[int] = None,
categories: Optional[List[int]] = None,
tags: Optional[List[int]] = None,
check_duplicate: bool = True,
**kwargs) -> Optional[int]:
"""
Erstellt einen neuen WordPress-Beitrag
Args:
title: Titel des Beitrags
content: Inhalt des Beitrags (HTML)
status: Status (draft, publish, etc.)
featured_media: ID des Beitragsbilds
categories: Liste der Kategorie-IDs
tags: Liste der Tag-IDs
check_duplicate: Prüfung auf Duplikate (Standard: True)
**kwargs: Weitere WordPress-Post-Felder
Returns:
Post-ID des erstellten Beitrags, oder None bei Fehler
"""
# Duplikatsprüfung
if check_duplicate:
existing_id = self.check_post_exists(title)
if existing_id:
print(f"Beitrag '{title}' existiert bereits (ID: {existing_id})")
return existing_id
# Post-Daten zusammenstellen
post_data = {
'title': title,
'content': content,
'status': status,
**kwargs
}
if featured_media:
post_data['featured_media'] = featured_media
if categories:
post_data['categories'] = categories
if tags:
post_data['tags'] = tags
# Beitrag erstellen
try:
response = self._post('posts', data=post_data)
post = response.json()
post_id = post['id']
print(f"Beitrag '{title}' erstellt (ID: {post_id}, Status: {status})")
return post_id
except requests.exceptions.RequestException as e:
print(f"Fehler beim Erstellen des Beitrags: {e}")
if hasattr(e.response, 'text'):
print(f"Details: {e.response.text}")
return None
def get_categories(self) -> List[Dict[str, Any]]:
"""Holt alle verfügbaren Kategorien"""
try:
response = self._get('categories', params={'per_page': 100})
return response.json()
except requests.exceptions.RequestException as e:
print(f"Fehler beim Abrufen der Kategorien: {e}")
return []
def get_or_create_category(self, name: str) -> Optional[int]:
"""Holt oder erstellt eine Kategorie"""
categories = self.get_categories()
for cat in categories:
if cat['name'].lower() == name.lower():
return cat['id']
# Kategorie erstellen
try:
response = self._post('categories', data={'name': name})
return response.json()['id']
except requests.exceptions.RequestException as e:
# Prüfe ob Fehler durch bereits existierende Kategorie
if e.response is not None and e.response.status_code == 400:
# Kategorie könnte durch Race Condition gerade erstellt worden sein
# Erneut suchen
categories = self.get_categories()
for cat in categories:
if cat['name'].lower() == name.lower():
return cat['id']
print(f"Fehler beim Erstellen der Kategorie '{name}': {e}")
if hasattr(e, 'response') and e.response is not None:
try:
error_data = e.response.json()
print(f"Details: {error_data}")
except:
print(f"Response: {e.response.text}")
return None
def get_tags(self) -> List[Dict[str, Any]]:
"""Holt alle verfügbaren Tags"""
try:
response = self._get('tags', params={'per_page': 100})
return response.json()
except requests.exceptions.RequestException as e:
print(f"Fehler beim Abrufen der Tags: {e}")
return []
def get_or_create_tag(self, name: str) -> Optional[int]:
"""Holt oder erstellt einen Tag"""
tags = self.get_tags()
for tag in tags:
if tag['name'].lower() == name.lower():
return tag['id']
# Tag erstellen
try:
response = self._post('tags', data={'name': name})
return response.json()['id']
except requests.exceptions.RequestException as e:
# Prüfe ob Fehler durch bereits existierenden Tag
if e.response is not None and e.response.status_code == 400:
# Tag könnte durch Race Condition gerade erstellt worden sein
# Erneut suchen
tags = self.get_tags()
for tag in tags:
if tag['name'].lower() == name.lower():
return tag['id']
print(f"Fehler beim Erstellen des Tags '{name}': {e}")
if hasattr(e, 'response') and e.response is not None:
try:
error_data = e.response.json()
print(f"Details: {error_data}")
except:
print(f"Response: {e.response.text}")
return None
def calculate_file_hash(file_path: str) -> str:
"""Berechnet MD5-Hash einer Datei"""
hash_md5 = hashlib.md5()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()