Fetch builds from Jenkins API
This commit is contained in:
@@ -1,7 +1,12 @@
|
|||||||
|
import base64
|
||||||
import json
|
import json
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Dict, List
|
from typing import Dict, List
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
from app.settings import settings
|
||||||
|
|
||||||
DATA_DIR = Path(__file__).resolve().parent.parent / "data"
|
DATA_DIR = Path(__file__).resolve().parent.parent / "data"
|
||||||
|
|
||||||
|
|
||||||
@@ -14,9 +19,50 @@ def _load_json(filename: str) -> Dict:
|
|||||||
def _sort_builds(builds: List[Dict]) -> List[Dict]:
|
def _sort_builds(builds: List[Dict]) -> List[Dict]:
|
||||||
return sorted(builds, key=lambda build: build.get("number", 0), reverse=True)
|
return sorted(builds, key=lambda build: build.get("number", 0), reverse=True)
|
||||||
|
|
||||||
|
def normalize_build(build: Dict) -> Dict:
|
||||||
|
changes = build.get("changeSets", [])
|
||||||
|
commits = []
|
||||||
|
|
||||||
|
for cs in changes:
|
||||||
|
for item in cs.get("items", []):
|
||||||
|
commits.append({
|
||||||
|
"commit": item.get("commitId", "")[:7],
|
||||||
|
"message": item.get("msg", ""),
|
||||||
|
"author": item.get("author", {}).get("fullName", "unknown"),
|
||||||
|
})
|
||||||
|
|
||||||
|
return {
|
||||||
|
"number": build.get("number"),
|
||||||
|
"status": (build.get("result") or "RUNNING").lower(),
|
||||||
|
"finished_at": build.get("timestamp"),
|
||||||
|
"duration_seconds": build.get("duration", 0) // 1000,
|
||||||
|
"url": build.get("url"),
|
||||||
|
"commits": commits,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _auth_header() -> Dict[str, str]:
|
||||||
|
token = f"{settings.jenkins_user}:{settings.jenkins_token}"
|
||||||
|
encoded = base64.b64encode(token.encode()).decode()
|
||||||
|
return {"Authorization": f"Basic {encoded}"}
|
||||||
|
|
||||||
|
def fetch_builds(limit: int = 5) -> List[Dict]:
|
||||||
|
url = (
|
||||||
|
f"{settings.jenkins_base_url}/job/{settings.jenkins_job_name}/api/json"
|
||||||
|
"?tree=builds[number,url,result,timestamp,duration,"
|
||||||
|
"changesets[items[commitId,msg,author[fullName]]]]"
|
||||||
|
)
|
||||||
|
|
||||||
|
resp = requests.get(url, headers = _auth_header(), timeout=5)
|
||||||
|
resp.raise_for_status()
|
||||||
|
|
||||||
|
builds = resp.json().get("builds", [])
|
||||||
|
return builds[:limit]
|
||||||
|
|
||||||
def build_history() -> Dict:
|
def build_history() -> Dict:
|
||||||
"""Return Jenkins build history data."""
|
"""Return Jenkins build history data."""
|
||||||
history = _load_json("build_history.json")
|
history = _load_json("build_history.json")
|
||||||
builds = history.get("builds", [])
|
builds = history.get("builds", [])
|
||||||
return {"builds": _sort_builds(builds)}
|
return {
|
||||||
|
"builds": [normalize_build(b) for b in builds]
|
||||||
|
}
|
||||||
|
|||||||
@@ -8,6 +8,10 @@ class RuntimeConfig:
|
|||||||
git_commit: str = os.getenv("GIT_COMMIT", "local")
|
git_commit: str = os.getenv("GIT_COMMIT", "local")
|
||||||
build_number: str = os.getenv("BUILD_NUMBER", "-")
|
build_number: str = os.getenv("BUILD_NUMBER", "-")
|
||||||
commit_author: str = os.getenv("COMMIT_AUTHOR", "local")
|
commit_author: str = os.getenv("COMMIT_AUTHOR", "local")
|
||||||
|
jenkins_base_url: str = os.getenv("JENKINS_BASE_URL", "localhost:8080")
|
||||||
|
jenkins_job_name: str = os.getenv("JENKINS_JOB_NAME", "")
|
||||||
|
jenkins_user: str = os.getenv("JENKINS_USER", "")
|
||||||
|
jenkins_token: str = os.getenv("JENKINS_TOKEN", "")
|
||||||
|
|
||||||
|
|
||||||
settings = RuntimeConfig()
|
settings = RuntimeConfig()
|
||||||
|
|||||||
Reference in New Issue
Block a user