Source code for julee.api.routers.system
"""
System API router for the julee CEAP system.
This module provides system-level API endpoints including health checks,
status information, and other operational endpoints.
Routes defined at root level:
- GET /health - Health check endpoint
These routes are mounted at the root level in the main app.
"""
import asyncio
import logging
import os
from datetime import datetime, timezone
from fastapi import APIRouter
from minio import Minio
from temporalio.client import Client
from julee.api.responses import (
HealthCheckResponse,
ServiceHealthStatus,
ServiceStatus,
SystemStatus,
)
[docs]
logger = logging.getLogger(__name__)
# Create the router for system endpoints
[docs]
async def check_temporal_health() -> ServiceStatus:
"""Check if Temporal service is available."""
try:
# Get Temporal server address from environment or use default
temporal_address = os.getenv(
"TEMPORAL_ENDPOINT", os.getenv("TEMPORAL_HOST", "localhost:7233")
)
# Create a client and try to connect
_ = await Client.connect(temporal_address, namespace="default")
# Simple check - if we can connect, assume it's working
return ServiceStatus.UP
except Exception as e:
logger.warning("Temporal health check failed: %s", e)
return ServiceStatus.DOWN
[docs]
async def check_storage_health() -> ServiceStatus:
"""Check if storage service (Minio) is available."""
try:
# Get Minio configuration (prioritize Docker network address)
endpoint = os.environ.get("MINIO_ENDPOINT", "localhost:9000")
access_key = os.environ.get("MINIO_ACCESS_KEY", "minioadmin")
secret_key = os.environ.get("MINIO_SECRET_KEY", "minioadmin")
secure = os.environ.get("MINIO_SECURE", "false").lower() == "true"
# Create Minio client
client = Minio(
endpoint=endpoint,
access_key=access_key,
secret_key=secret_key,
secure=secure,
)
# Test connection by listing buckets
_ = list(client.list_buckets())
return ServiceStatus.UP
except Exception as e:
logger.warning("Storage health check failed: %s", e)
return ServiceStatus.DOWN
[docs]
async def check_api_health() -> ServiceStatus:
"""Check if API service is available (self-check)."""
# Since we're responding, API is up
return ServiceStatus.UP
[docs]
def determine_overall_status(services: ServiceHealthStatus) -> SystemStatus:
"""Determine overall system status based on service statuses."""
service_statuses = [services.api, services.temporal, services.storage]
if all(status == ServiceStatus.UP for status in service_statuses):
return SystemStatus.HEALTHY
elif any(status == ServiceStatus.UP for status in service_statuses):
return SystemStatus.DEGRADED
else:
return SystemStatus.UNHEALTHY
@router.get("/health", response_model=HealthCheckResponse)
[docs]
async def health_check() -> HealthCheckResponse:
"""Comprehensive health check endpoint that checks all services."""
logger.info("Performing health check")
# Check all services concurrently
results = await asyncio.gather(
check_api_health(),
check_temporal_health(),
check_storage_health(),
return_exceptions=True,
)
# Handle any exceptions from the health checks
api_status = results[0]
temporal_status = results[1]
storage_status = results[2]
if isinstance(api_status, Exception):
logger.error("API health check error: %s", api_status)
api_status = ServiceStatus.DOWN
if isinstance(temporal_status, Exception):
logger.error("Temporal health check error: %s", temporal_status)
temporal_status = ServiceStatus.DOWN
if isinstance(storage_status, Exception):
logger.error("Storage health check error: %s", storage_status)
storage_status = ServiceStatus.DOWN
# Create service health status with proper typing
services = ServiceHealthStatus(
api=ServiceStatus(api_status),
temporal=ServiceStatus(temporal_status),
storage=ServiceStatus(storage_status),
)
# Determine overall status
overall_status = determine_overall_status(services)
# Return response with string timestamp as expected by frontend
return HealthCheckResponse(
status=overall_status,
timestamp=datetime.now(timezone.utc).isoformat(),
services=services,
)