467 lines
No EOL
19 KiB
Python
467 lines
No EOL
19 KiB
Python
import os
|
|
import sys
|
|
import json
|
|
import argparse
|
|
import datetime
|
|
import requests
|
|
import hashlib
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
|
|
# Konfiguration
|
|
FORGEJO_URL = os.getenv("FORGEJO_URL", "https://forgejo.de")
|
|
API_BASE = f"{FORGEJO_URL}/api/v1"
|
|
|
|
# Für die Authentifizierung (falls erforderlich)
|
|
TOKEN = os.getenv("FORGEJO_TOKEN")
|
|
|
|
class ForgejoMarkdownExporter:
|
|
def __init__(
|
|
self,
|
|
repo_owner: str,
|
|
repo_name: str,
|
|
output_dir: str,
|
|
include_comments: bool = True,
|
|
include_closed: bool = True,
|
|
fetch_all_repos: bool = False
|
|
):
|
|
self.repo_owner = repo_owner
|
|
self.repo_name = repo_name
|
|
self.output_dir = output_dir
|
|
self.include_comments = include_comments
|
|
self.include_closed = include_closed
|
|
self.fetch_all_repos = fetch_all_repos
|
|
self.metadata_file = os.path.join(output_dir, "_metadata.json")
|
|
|
|
# Erstelle den Ausgabeordner, falls er nicht existiert
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
def get_headers(self) -> Dict[str, str]:
|
|
"""Gibt die HTTP-Header für die API-Anfragen zurück"""
|
|
headers = {
|
|
"User-Agent": "ForgejoMarkdownExporter/1.0",
|
|
"Accept": "application/json"
|
|
}
|
|
|
|
# Füge Token hinzu, falls vorhanden
|
|
if "TOKEN" in globals() and TOKEN:
|
|
headers["Authorization"] = f"token {TOKEN}"
|
|
|
|
return headers
|
|
|
|
def fetch_repos(self) -> List[Dict[str, Any]]:
|
|
"""Ruft alle Repositories des angegebenen Besitzers ab"""
|
|
if not self.fetch_all_repos:
|
|
# Wenn wir nur ein spezifisches Repo wollen
|
|
return [{
|
|
"owner": {"username": self.repo_owner},
|
|
"name": self.repo_name,
|
|
"full_name": f"{self.repo_owner}/{self.repo_name}"
|
|
}]
|
|
|
|
repos_url = f"{API_BASE}/orgs/{self.repo_owner}/repos"
|
|
try:
|
|
response = requests.get(repos_url, headers=self.get_headers())
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
print(f"Fehler beim Abrufen der Repositories: {e}")
|
|
# Fallback auf ein einzelnes Repo
|
|
return [{
|
|
"owner": {"username": self.repo_owner},
|
|
"name": self.repo_name,
|
|
"full_name": f"{self.repo_owner}/{self.repo_name}"
|
|
}]
|
|
|
|
def fetch_issues(self, repo_full_name: str) -> List[Dict[str, Any]]:
|
|
"""Ruft alle Issues eines Repositories ab"""
|
|
issues = []
|
|
page = 1
|
|
per_page = 50
|
|
|
|
while True:
|
|
issues_url = f"{API_BASE}/repos/{repo_full_name}/issues?page={page}&per_page={per_page}&state={'all' if self.include_closed else 'open'}"
|
|
try:
|
|
response = requests.get(issues_url, headers=self.get_headers())
|
|
response.raise_for_status()
|
|
page_issues = response.json()
|
|
|
|
if not page_issues:
|
|
break
|
|
|
|
issues.extend(page_issues)
|
|
page += 1
|
|
except Exception as e:
|
|
print(f"Fehler beim Abrufen der Issues für {repo_full_name}: {e}")
|
|
break
|
|
|
|
return issues
|
|
|
|
def fetch_comments(self, repo_full_name: str, issue_number: int) -> List[Dict[str, Any]]:
|
|
"""Ruft alle Kommentare zu einem Issue ab"""
|
|
if not self.include_comments:
|
|
return []
|
|
|
|
comments_url = f"{API_BASE}/repos/{repo_full_name}/issues/{issue_number}/comments"
|
|
try:
|
|
response = requests.get(comments_url, headers=self.get_headers())
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
print(f"Fehler beim Abrufen der Kommentare für Issue #{issue_number}: {e}")
|
|
return []
|
|
|
|
def generate_markdown(self, issue: Dict[str, Any], comments: List[Dict[str, Any]], repo_full_name: str) -> str:
|
|
"""Erstellt Markdown-Inhalt für ein Issue"""
|
|
# Basisinformationen
|
|
issue_number = issue.get('number', 'unknown')
|
|
title = issue.get('title', 'No Title')
|
|
state = issue.get('state', 'unknown')
|
|
created_at = issue.get('created_at', 'unknown')
|
|
updated_at = issue.get('updated_at', 'unknown')
|
|
html_url = issue.get('html_url', '')
|
|
|
|
# Benutzerinformationen
|
|
author = "Unbekannt"
|
|
if issue.get('user') and issue['user'].get('username'):
|
|
author = issue['user']['username']
|
|
|
|
# Erstelle Markdown-Header
|
|
markdown = f"# [{repo_full_name}] Issue #{issue_number}: {title}\n\n"
|
|
|
|
# Metadaten-Bereich
|
|
markdown += "## Metadaten\n\n"
|
|
markdown += f"- **Issue ID:** {issue_number}\n"
|
|
markdown += f"- **Repository:** {repo_full_name}\n"
|
|
markdown += f"- **Autor:** {author}\n"
|
|
markdown += f"- **Status:** {state}\n"
|
|
markdown += f"- **Erstellt am:** {created_at}\n"
|
|
markdown += f"- **Aktualisiert am:** {updated_at}\n"
|
|
markdown += f"- **URL:** {html_url}\n\n"
|
|
|
|
# Labels
|
|
if issue.get('labels') and len(issue['labels']) > 0:
|
|
markdown += "## Labels\n\n"
|
|
for label in issue['labels']:
|
|
markdown += f"- {label.get('name', 'Unbekanntes Label')}\n"
|
|
markdown += "\n"
|
|
|
|
# Milestone
|
|
if issue.get('milestone'):
|
|
milestone = issue['milestone']
|
|
markdown += "## Milestone\n\n"
|
|
markdown += f"- **Titel:** {milestone.get('title', 'Kein Titel')}\n"
|
|
markdown += f"- **Status:** {milestone.get('state', 'unbekannt')}\n"
|
|
|
|
if milestone.get('due_on'):
|
|
markdown += f"- **Fälligkeitsdatum:** {milestone.get('due_on')}\n"
|
|
|
|
if milestone.get('description'):
|
|
markdown += f"\n**Beschreibung:**\n\n{milestone.get('description')}\n\n"
|
|
else:
|
|
markdown += "\n"
|
|
|
|
# Inhalt des Issues
|
|
markdown += "## Beschreibung\n\n"
|
|
if issue.get('body'):
|
|
markdown += f"{issue['body']}\n\n"
|
|
else:
|
|
markdown += "_Keine Beschreibung vorhanden._\n\n"
|
|
|
|
# Kommentare
|
|
if comments and len(comments) > 0:
|
|
markdown += "## Kommentare\n\n"
|
|
for i, comment in enumerate(comments, 1):
|
|
user = "Unbekannt"
|
|
if comment.get('user') and comment['user'].get('username'):
|
|
user = comment['user']['username']
|
|
|
|
created_at = comment.get('created_at', 'Unbekanntes Datum')
|
|
body = comment.get('body', 'Kein Inhalt')
|
|
|
|
markdown += f"### Kommentar {i} von {user} am {created_at}\n\n"
|
|
markdown += f"{body}\n\n"
|
|
markdown += "---\n\n"
|
|
|
|
return markdown
|
|
|
|
def save_issue_to_file(self, issue: Dict[str, Any], repo_full_name: str) -> str:
|
|
"""Speichert ein Issue als Markdown-Datei und gibt den Dateipfad zurück"""
|
|
issue_number = issue.get('number', 'unknown')
|
|
title = issue.get('title', 'No Title').replace('/', '-').replace('\\', '-')
|
|
|
|
# Erstelle einen sicheren Dateinamen
|
|
safe_title = ''.join(c if c.isalnum() or c in [' ', '-', '_'] else '_' for c in title)
|
|
|
|
# Erstelle einen Unterordner für das Repository
|
|
repo_dir = os.path.join(self.output_dir, repo_full_name.replace('/', '_'))
|
|
os.makedirs(repo_dir, exist_ok=True)
|
|
|
|
# Neuer Dateiname ohne Repo-Präfix
|
|
filename = f"issue_{issue_number}_{safe_title[:50]}.md"
|
|
filepath = os.path.join(repo_dir, filename)
|
|
|
|
# Hole Kommentare
|
|
comments = self.fetch_comments(repo_full_name, issue_number)
|
|
|
|
# Erstelle Markdown
|
|
markdown_content = self.generate_markdown(issue, comments, repo_full_name)
|
|
|
|
# Speichere in Datei
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
f.write(markdown_content)
|
|
|
|
print(f"Issue #{issue_number} gespeichert: {filepath}")
|
|
return filepath
|
|
|
|
def calculate_issue_hash(self, issue: Dict[str, Any], comments: List[Dict[str, Any]]) -> str:
|
|
"""Berechnet einen Hash für ein Issue und seine Kommentare zur Erkennung von Änderungen"""
|
|
# Kombiniere relevante Daten für den Hash
|
|
hash_data = {
|
|
"issue_id": issue.get('id'),
|
|
"title": issue.get('title'),
|
|
"body": issue.get('body'),
|
|
"state": issue.get('state'),
|
|
"updated_at": issue.get('updated_at'),
|
|
"comments": [
|
|
{
|
|
"id": comment.get('id'),
|
|
"body": comment.get('body'),
|
|
"updated_at": comment.get('updated_at')
|
|
}
|
|
for comment in comments
|
|
]
|
|
}
|
|
|
|
# Erstelle einen JSON-String und hashe ihn
|
|
json_str = json.dumps(hash_data, sort_keys=True)
|
|
return hashlib.md5(json_str.encode('utf-8')).hexdigest()
|
|
|
|
def load_metadata(self) -> Dict[str, Any]:
|
|
"""Lädt Metadaten aus einer Datei, falls vorhanden"""
|
|
if not os.path.exists(self.metadata_file):
|
|
return {"issues": {}, "last_update": None}
|
|
|
|
try:
|
|
with open(self.metadata_file, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
print(f"Fehler beim Laden der Metadaten: {e}")
|
|
return {"issues": {}, "last_update": None}
|
|
|
|
def save_metadata(self, metadata: Dict[str, Any]):
|
|
"""Speichert Metadaten in einer Datei"""
|
|
try:
|
|
with open(self.metadata_file, 'w', encoding='utf-8') as f:
|
|
json.dump(metadata, f, indent=2)
|
|
except Exception as e:
|
|
print(f"Fehler beim Speichern der Metadaten: {e}")
|
|
|
|
def export(self) -> int:
|
|
"""Exportiert alle Issues und gibt die Anzahl der aktualisierten Issues zurück"""
|
|
start_time = datetime.datetime.now()
|
|
|
|
# Lade vorherige Metadaten
|
|
metadata = self.load_metadata()
|
|
issue_hashes = metadata.get("issues", {})
|
|
|
|
# Hole Repositories
|
|
repos = self.fetch_repos()
|
|
|
|
# Zähler für Statistiken
|
|
total_issues = 0
|
|
new_issues = 0
|
|
updated_issues = 0
|
|
unchanged_issues = 0
|
|
|
|
# Liste der aktuellen Issue-IDs für Bereinigung
|
|
current_issue_ids = []
|
|
|
|
# Verarbeite jedes Repository
|
|
for repo in repos:
|
|
repo_full_name = repo.get('full_name')
|
|
if not repo_full_name:
|
|
continue
|
|
|
|
print(f"\nVerarbeite Repository: {repo_full_name}")
|
|
|
|
# Hole Issues
|
|
issues = self.fetch_issues(repo_full_name)
|
|
print(f" {len(issues)} Issues gefunden")
|
|
|
|
# Verarbeite jedes Issue
|
|
for issue in issues:
|
|
issue_id = str(issue.get('id', ''))
|
|
if not issue_id:
|
|
continue
|
|
|
|
total_issues += 1
|
|
current_issue_ids.append(issue_id)
|
|
|
|
# Hole Kommentare
|
|
comments = self.fetch_comments(repo_full_name, issue.get('number', 0))
|
|
|
|
# Berechne Hash zur Erkennung von Änderungen
|
|
current_hash = self.calculate_issue_hash(issue, comments)
|
|
|
|
# Prüfe, ob sich das Issue geändert hat
|
|
if issue_id in issue_hashes and issue_hashes[issue_id]["hash"] == current_hash:
|
|
unchanged_issues += 1
|
|
print(f" Issue #{issue.get('number')} unverändert - überspringe")
|
|
continue
|
|
|
|
# Issue ist neu oder hat sich geändert
|
|
filepath = self.save_issue_to_file(issue, repo_full_name)
|
|
|
|
# Aktualisiere die Metadaten
|
|
if issue_id in issue_hashes:
|
|
updated_issues += 1
|
|
else:
|
|
new_issues += 1
|
|
|
|
# Relativen Pfad zur Ausgabedatei speichern
|
|
file_rel_path = os.path.relpath(filepath, self.output_dir)
|
|
issue_hashes[issue_id] = {
|
|
"hash": current_hash,
|
|
"number": issue.get('number'),
|
|
"repo": repo_full_name,
|
|
"title": issue.get('title'),
|
|
"file": file_rel_path,
|
|
"updated_at": issue.get('updated_at'),
|
|
"state": issue.get('state')
|
|
}
|
|
|
|
# Entferne gelöschte Issues aus der Metadatendatei
|
|
deleted_issues = []
|
|
for issue_id in list(issue_hashes.keys()):
|
|
if issue_id not in current_issue_ids:
|
|
# Lösche die zugehörige Datei
|
|
file_path = os.path.join(self.output_dir, issue_hashes[issue_id].get("file", ""))
|
|
if os.path.exists(file_path):
|
|
try:
|
|
os.remove(file_path)
|
|
print(f"Gelöschtes Issue entfernt: {file_path}")
|
|
except Exception as e:
|
|
print(f"Fehler beim Löschen der Datei {file_path}: {e}")
|
|
|
|
deleted_issues.append(issue_id)
|
|
del issue_hashes[issue_id]
|
|
|
|
# Aktualisiere Metadaten
|
|
metadata["issues"] = issue_hashes
|
|
metadata["last_update"] = datetime.datetime.now().isoformat()
|
|
self.save_metadata(metadata)
|
|
|
|
# Statistiken ausgeben
|
|
end_time = datetime.datetime.now()
|
|
duration = (end_time - start_time).total_seconds()
|
|
|
|
print("\n" + "="*50)
|
|
print(f"Export abgeschlossen in {duration:.2f} Sekunden")
|
|
print(f"Verarbeitete Issues: {total_issues}")
|
|
print(f"Neue Issues: {new_issues}")
|
|
print(f"Aktualisierte Issues: {updated_issues}")
|
|
print(f"Unveränderte Issues: {unchanged_issues}")
|
|
print(f"Gelöschte Issues: {len(deleted_issues)}")
|
|
print("="*50)
|
|
|
|
# Erstelle eine index.md mit Zusammenfassung
|
|
self.create_index_file(total_issues, new_issues, updated_issues, deleted_issues)
|
|
|
|
return new_issues + updated_issues
|
|
|
|
def create_index_file(self, total: int, new: int, updated: int, deleted: List[str]):
|
|
"""Erstellt eine Index-Datei mit einer Übersicht aller Issues"""
|
|
index_path = os.path.join(self.output_dir, "index.md")
|
|
|
|
# Lade Metadaten
|
|
metadata = self.load_metadata()
|
|
issues = metadata.get("issues", {})
|
|
last_update = metadata.get("last_update", "Unbekannt")
|
|
|
|
try:
|
|
with open(index_path, 'w', encoding='utf-8') as f:
|
|
f.write(f"# Forgejo Issues Übersicht\n\n")
|
|
f.write(f"Letzte Aktualisierung: {last_update}\n\n")
|
|
|
|
# Statistiken
|
|
f.write("## Statistiken\n\n")
|
|
f.write(f"- **Gesamtzahl der Issues:** {total}\n")
|
|
f.write(f"- **Neue Issues bei letzter Aktualisierung:** {new}\n")
|
|
f.write(f"- **Aktualisierte Issues bei letzter Aktualisierung:** {updated}\n")
|
|
f.write(f"- **Gelöschte Issues bei letzter Aktualisierung:** {len(deleted)}\n\n")
|
|
|
|
# Gruppiere nach Repository
|
|
repos = {}
|
|
for issue_id, issue_data in issues.items():
|
|
repo = issue_data.get("repo", "Unbekannt")
|
|
if repo not in repos:
|
|
repos[repo] = []
|
|
repos[repo].append(issue_data)
|
|
|
|
# Sortiere Issues nach Nummer innerhalb jedes Repos
|
|
for repo in repos:
|
|
repos[repo].sort(key=lambda x: x.get("number", 0))
|
|
|
|
# Liste alle Issues nach Repository gruppiert auf
|
|
f.write("## Issues nach Repository\n\n")
|
|
|
|
for repo, repo_issues in sorted(repos.items()):
|
|
f.write(f"### {repo}\n\n")
|
|
|
|
# Offene Issues
|
|
open_issues = [i for i in repo_issues if i.get("state") == "open"]
|
|
if open_issues:
|
|
f.write("#### Offene Issues\n\n")
|
|
for issue in open_issues:
|
|
number = issue.get("number", "?")
|
|
title = issue.get("title", "Kein Titel")
|
|
file = issue.get("file", "")
|
|
f.write(f"- [#{number}: {title}]({file})\n")
|
|
f.write("\n")
|
|
|
|
# Geschlossene Issues
|
|
closed_issues = [i for i in repo_issues if i.get("state") == "closed"]
|
|
if closed_issues:
|
|
f.write("#### Geschlossene Issues\n\n")
|
|
for issue in closed_issues:
|
|
number = issue.get("number", "?")
|
|
title = issue.get("title", "Kein Titel")
|
|
file = issue.get("file", "")
|
|
f.write(f"- [#{number}: {title}]({file})\n")
|
|
f.write("\n")
|
|
|
|
f.write("\n")
|
|
|
|
print(f"Index-Datei erstellt: {index_path}")
|
|
|
|
except Exception as e:
|
|
print(f"Fehler beim Erstellen der Index-Datei: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Exportiert Forgejo-Issues in Markdown-Dateien")
|
|
parser.add_argument("--owner", required=True, help="Repository-Besitzer (Benutzer oder Organisation)")
|
|
parser.add_argument("--repo", required=True, help="Repository-Name oder 'all' für alle Repositories")
|
|
parser.add_argument("--output", default="./forgejo_issues", help="Ausgabeordner für Markdown-Dateien")
|
|
parser.add_argument("--comments", action="store_true", help="Kommentare einbeziehen")
|
|
parser.add_argument("--closed", action="store_true", help="Geschlossene Issues einbeziehen")
|
|
|
|
args = parser.parse_args()
|
|
|
|
fetch_all = args.repo.lower() == "all"
|
|
repo_name = "" if fetch_all else args.repo
|
|
|
|
exporter = ForgejoMarkdownExporter(
|
|
repo_owner=args.owner,
|
|
repo_name=repo_name,
|
|
output_dir=args.output,
|
|
include_comments=args.comments,
|
|
include_closed=args.closed,
|
|
fetch_all_repos=fetch_all
|
|
)
|
|
|
|
num_updated = exporter.export()
|
|
print(f"\nErgebnis: {num_updated} Issues wurden aktualisiert oder neu hinzugefügt.") |