git_issue_importer/forgejo_exporter.py
2025-03-10 18:06:17 +01:00

459 lines
No EOL
18 KiB
Python

import os
import sys
import json
import argparse
import datetime
import requests
import hashlib
from pathlib import Path
from typing import Dict, List, Any, Optional
from dotenv import load_dotenv
load_dotenv()
# Konfiguration
FORGEJO_URL = os.getenv("FORGEJO_URL", "https://forgejo.de")
API_BASE = f"{FORGEJO_URL}/api/v1"
# Für die Authentifizierung (falls erforderlich)
TOKEN = os.getenv("FORGEJO_TOKEN")
class ForgejoMarkdownExporter:
def __init__(
self,
repo_owner: str,
repo_name: str,
output_dir: str,
include_comments: bool = True,
include_closed: bool = True,
fetch_all_repos: bool = False
):
self.repo_owner = repo_owner
self.repo_name = repo_name
self.output_dir = output_dir
self.include_comments = include_comments
self.include_closed = include_closed
self.fetch_all_repos = fetch_all_repos
self.metadata_file = os.path.join(output_dir, "_metadata.json")
# Erstelle den Ausgabeordner, falls er nicht existiert
os.makedirs(output_dir, exist_ok=True)
def get_headers(self) -> Dict[str, str]:
"""Gibt die HTTP-Header für die API-Anfragen zurück"""
headers = {
"User-Agent": "ForgejoMarkdownExporter/1.0",
"Accept": "application/json"
}
# Füge Token hinzu, falls vorhanden
if "TOKEN" in globals() and TOKEN:
headers["Authorization"] = f"token {TOKEN}"
return headers
def fetch_repos(self) -> List[Dict[str, Any]]:
"""Ruft alle Repositories des angegebenen Besitzers ab"""
if not self.fetch_all_repos:
# Wenn wir nur ein spezifisches Repo wollen
return [{
"owner": {"username": self.repo_owner},
"name": self.repo_name,
"full_name": f"{self.repo_owner}/{self.repo_name}"
}]
repos_url = f"{API_BASE}/orgs/{self.repo_owner}/repos"
try:
response = requests.get(repos_url, headers=self.get_headers())
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Fehler beim Abrufen der Repositories: {e}")
# Fallback auf ein einzelnes Repo
return [{
"owner": {"username": self.repo_owner},
"name": self.repo_name,
"full_name": f"{self.repo_owner}/{self.repo_name}"
}]
def fetch_issues(self, repo_full_name: str) -> List[Dict[str, Any]]:
"""Ruft alle Issues eines Repositories ab"""
issues = []
page = 1
per_page = 50
while True:
issues_url = f"{API_BASE}/repos/{repo_full_name}/issues?page={page}&per_page={per_page}&state={'all' if self.include_closed else 'open'}"
try:
response = requests.get(issues_url, headers=self.get_headers())
response.raise_for_status()
page_issues = response.json()
if not page_issues:
break
issues.extend(page_issues)
page += 1
except Exception as e:
print(f"Fehler beim Abrufen der Issues für {repo_full_name}: {e}")
break
return issues
def fetch_comments(self, repo_full_name: str, issue_number: int) -> List[Dict[str, Any]]:
"""Ruft alle Kommentare zu einem Issue ab"""
if not self.include_comments:
return []
comments_url = f"{API_BASE}/repos/{repo_full_name}/issues/{issue_number}/comments"
try:
response = requests.get(comments_url, headers=self.get_headers())
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Fehler beim Abrufen der Kommentare für Issue #{issue_number}: {e}")
return []
def generate_markdown(self, issue: Dict[str, Any], comments: List[Dict[str, Any]], repo_full_name: str) -> str:
"""Erstellt Markdown-Inhalt für ein Issue"""
# Basisinformationen
issue_number = issue.get('number', 'unknown')
title = issue.get('title', 'No Title')
state = issue.get('state', 'unknown')
created_at = issue.get('created_at', 'unknown')
updated_at = issue.get('updated_at', 'unknown')
html_url = issue.get('html_url', '')
# Benutzerinformationen
author = "Unbekannt"
if issue.get('user') and issue['user'].get('username'):
author = issue['user']['username']
# Erstelle Markdown-Header
markdown = f"# [{repo_full_name}] Issue #{issue_number}: {title}\n\n"
# Metadaten-Bereich
markdown += "## Metadaten\n\n"
markdown += f"- **Issue ID:** {issue_number}\n"
markdown += f"- **Repository:** {repo_full_name}\n"
markdown += f"- **Autor:** {author}\n"
markdown += f"- **Status:** {state}\n"
markdown += f"- **Erstellt am:** {created_at}\n"
markdown += f"- **Aktualisiert am:** {updated_at}\n"
markdown += f"- **URL:** {html_url}\n\n"
# Labels
if issue.get('labels') and len(issue['labels']) > 0:
markdown += "## Labels\n\n"
for label in issue['labels']:
markdown += f"- {label.get('name', 'Unbekanntes Label')}\n"
markdown += "\n"
# Milestone
if issue.get('milestone'):
milestone = issue['milestone']
markdown += "## Milestone\n\n"
markdown += f"- **Titel:** {milestone.get('title', 'Kein Titel')}\n"
markdown += f"- **Status:** {milestone.get('state', 'unbekannt')}\n"
if milestone.get('due_on'):
markdown += f"- **Fälligkeitsdatum:** {milestone.get('due_on')}\n"
if milestone.get('description'):
markdown += f"\n**Beschreibung:**\n\n{milestone.get('description')}\n\n"
else:
markdown += "\n"
# Inhalt des Issues
markdown += "## Beschreibung\n\n"
if issue.get('body'):
markdown += f"{issue['body']}\n\n"
else:
markdown += "_Keine Beschreibung vorhanden._\n\n"
# Kommentare
if comments and len(comments) > 0:
markdown += "## Kommentare\n\n"
for i, comment in enumerate(comments, 1):
user = "Unbekannt"
if comment.get('user') and comment['user'].get('username'):
user = comment['user']['username']
created_at = comment.get('created_at', 'Unbekanntes Datum')
body = comment.get('body', 'Kein Inhalt')
markdown += f"### Kommentar {i} von {user} am {created_at}\n\n"
markdown += f"{body}\n\n"
markdown += "---\n\n"
return markdown
def save_issue_to_file(self, issue: Dict[str, Any], repo_full_name: str) -> str:
"""Speichert ein Issue als Markdown-Datei und gibt den Dateipfad zurück"""
issue_number = issue.get('number', 'unknown')
title = issue.get('title', 'No Title').replace('/', '-').replace('\\', '-')
# Erstelle einen sicheren Dateinamen
safe_title = ''.join(c if c.isalnum() or c in [' ', '-', '_'] else '_' for c in title)
filename = f"{repo_full_name.replace('/', '_')}__issue_{issue_number}_{safe_title[:50]}.md"
filepath = os.path.join(self.output_dir, filename)
# Hole Kommentare
comments = self.fetch_comments(repo_full_name, issue_number)
# Erstelle Markdown
markdown_content = self.generate_markdown(issue, comments, repo_full_name)
# Speichere in Datei
with open(filepath, 'w', encoding='utf-8') as f:
f.write(markdown_content)
print(f"Issue #{issue_number} gespeichert: {filepath}")
return filepath
def calculate_issue_hash(self, issue: Dict[str, Any], comments: List[Dict[str, Any]]) -> str:
"""Berechnet einen Hash für ein Issue und seine Kommentare zur Erkennung von Änderungen"""
# Kombiniere relevante Daten für den Hash
hash_data = {
"issue_id": issue.get('id'),
"title": issue.get('title'),
"body": issue.get('body'),
"state": issue.get('state'),
"updated_at": issue.get('updated_at'),
"comments": [
{
"id": comment.get('id'),
"body": comment.get('body'),
"updated_at": comment.get('updated_at')
}
for comment in comments
]
}
# Erstelle einen JSON-String und hashe ihn
json_str = json.dumps(hash_data, sort_keys=True)
return hashlib.md5(json_str.encode('utf-8')).hexdigest()
def load_metadata(self) -> Dict[str, Any]:
"""Lädt Metadaten aus einer Datei, falls vorhanden"""
if not os.path.exists(self.metadata_file):
return {"issues": {}, "last_update": None}
try:
with open(self.metadata_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Fehler beim Laden der Metadaten: {e}")
return {"issues": {}, "last_update": None}
def save_metadata(self, metadata: Dict[str, Any]):
"""Speichert Metadaten in einer Datei"""
try:
with open(self.metadata_file, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2)
except Exception as e:
print(f"Fehler beim Speichern der Metadaten: {e}")
def export(self) -> int:
"""Exportiert alle Issues und gibt die Anzahl der aktualisierten Issues zurück"""
start_time = datetime.datetime.now()
# Lade vorherige Metadaten
metadata = self.load_metadata()
issue_hashes = metadata.get("issues", {})
# Hole Repositories
repos = self.fetch_repos()
# Zähler für Statistiken
total_issues = 0
new_issues = 0
updated_issues = 0
unchanged_issues = 0
# Liste der aktuellen Issue-IDs für Bereinigung
current_issue_ids = []
# Verarbeite jedes Repository
for repo in repos:
repo_full_name = repo.get('full_name')
if not repo_full_name:
continue
print(f"\nVerarbeite Repository: {repo_full_name}")
# Hole Issues
issues = self.fetch_issues(repo_full_name)
print(f" {len(issues)} Issues gefunden")
# Verarbeite jedes Issue
for issue in issues:
issue_id = str(issue.get('id', ''))
if not issue_id:
continue
total_issues += 1
current_issue_ids.append(issue_id)
# Hole Kommentare
comments = self.fetch_comments(repo_full_name, issue.get('number', 0))
# Berechne Hash zur Erkennung von Änderungen
current_hash = self.calculate_issue_hash(issue, comments)
# Prüfe, ob sich das Issue geändert hat
if issue_id in issue_hashes and issue_hashes[issue_id]["hash"] == current_hash:
unchanged_issues += 1
print(f" Issue #{issue.get('number')} unverändert - überspringe")
continue
# Issue ist neu oder hat sich geändert
filepath = self.save_issue_to_file(issue, repo_full_name)
# Aktualisiere die Metadaten
if issue_id in issue_hashes:
updated_issues += 1
else:
new_issues += 1
issue_hashes[issue_id] = {
"hash": current_hash,
"number": issue.get('number'),
"repo": repo_full_name,
"title": issue.get('title'),
"file": os.path.basename(filepath),
"updated_at": issue.get('updated_at'),
"state": issue.get('state')
}
# Entferne gelöschte Issues aus der Metadatendatei
deleted_issues = []
for issue_id in list(issue_hashes.keys()):
if issue_id not in current_issue_ids:
# Lösche die zugehörige Datei
file_path = os.path.join(self.output_dir, issue_hashes[issue_id].get("file", ""))
if os.path.exists(file_path):
try:
os.remove(file_path)
print(f"Gelöschtes Issue entfernt: {file_path}")
except Exception as e:
print(f"Fehler beim Löschen der Datei {file_path}: {e}")
deleted_issues.append(issue_id)
del issue_hashes[issue_id]
# Aktualisiere Metadaten
metadata["issues"] = issue_hashes
metadata["last_update"] = datetime.datetime.now().isoformat()
self.save_metadata(metadata)
# Statistiken ausgeben
end_time = datetime.datetime.now()
duration = (end_time - start_time).total_seconds()
print("\n" + "="*50)
print(f"Export abgeschlossen in {duration:.2f} Sekunden")
print(f"Verarbeitete Issues: {total_issues}")
print(f"Neue Issues: {new_issues}")
print(f"Aktualisierte Issues: {updated_issues}")
print(f"Unveränderte Issues: {unchanged_issues}")
print(f"Gelöschte Issues: {len(deleted_issues)}")
print("="*50)
# Erstelle eine index.md mit Zusammenfassung
self.create_index_file(total_issues, new_issues, updated_issues, deleted_issues)
return new_issues + updated_issues
def create_index_file(self, total: int, new: int, updated: int, deleted: List[str]):
"""Erstellt eine Index-Datei mit einer Übersicht aller Issues"""
index_path = os.path.join(self.output_dir, "index.md")
# Lade Metadaten
metadata = self.load_metadata()
issues = metadata.get("issues", {})
last_update = metadata.get("last_update", "Unbekannt")
try:
with open(index_path, 'w', encoding='utf-8') as f:
f.write(f"# Forgejo Issues Übersicht\n\n")
f.write(f"Letzte Aktualisierung: {last_update}\n\n")
# Statistiken
f.write("## Statistiken\n\n")
f.write(f"- **Gesamtzahl der Issues:** {total}\n")
f.write(f"- **Neue Issues bei letzter Aktualisierung:** {new}\n")
f.write(f"- **Aktualisierte Issues bei letzter Aktualisierung:** {updated}\n")
f.write(f"- **Gelöschte Issues bei letzter Aktualisierung:** {len(deleted)}\n\n")
# Gruppiere nach Repository
repos = {}
for issue_id, issue_data in issues.items():
repo = issue_data.get("repo", "Unbekannt")
if repo not in repos:
repos[repo] = []
repos[repo].append(issue_data)
# Sortiere Issues nach Nummer innerhalb jedes Repos
for repo in repos:
repos[repo].sort(key=lambda x: x.get("number", 0))
# Liste alle Issues nach Repository gruppiert auf
f.write("## Issues nach Repository\n\n")
for repo, repo_issues in sorted(repos.items()):
f.write(f"### {repo}\n\n")
# Offene Issues
open_issues = [i for i in repo_issues if i.get("state") == "open"]
if open_issues:
f.write("#### Offene Issues\n\n")
for issue in open_issues:
number = issue.get("number", "?")
title = issue.get("title", "Kein Titel")
file = issue.get("file", "")
f.write(f"- [#{number}: {title}]({file})\n")
f.write("\n")
# Geschlossene Issues
closed_issues = [i for i in repo_issues if i.get("state") == "closed"]
if closed_issues:
f.write("#### Geschlossene Issues\n\n")
for issue in closed_issues:
number = issue.get("number", "?")
title = issue.get("title", "Kein Titel")
file = issue.get("file", "")
f.write(f"- [#{number}: {title}]({file})\n")
f.write("\n")
f.write("\n")
print(f"Index-Datei erstellt: {index_path}")
except Exception as e:
print(f"Fehler beim Erstellen der Index-Datei: {e}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Exportiert Forgejo-Issues in Markdown-Dateien")
parser.add_argument("--owner", required=True, help="Repository-Besitzer (Benutzer oder Organisation)")
parser.add_argument("--repo", required=True, help="Repository-Name oder 'all' für alle Repositories")
parser.add_argument("--output", default="./forgejo_issues", help="Ausgabeordner für Markdown-Dateien")
parser.add_argument("--comments", action="store_true", help="Kommentare einbeziehen")
parser.add_argument("--closed", action="store_true", help="Geschlossene Issues einbeziehen")
args = parser.parse_args()
fetch_all = args.repo.lower() == "all"
repo_name = "" if fetch_all else args.repo
exporter = ForgejoMarkdownExporter(
repo_owner=args.owner,
repo_name=repo_name,
output_dir=args.output,
include_comments=args.comments,
include_closed=args.closed,
fetch_all_repos=fetch_all
)
num_updated = exporter.export()
print(f"\nErgebnis: {num_updated} Issues wurden aktualisiert oder neu hinzugefügt.")