Source code for julee.api.routers.workflows
"""
Workflows API router for the julee CEAP system.
This module provides workflow management API endpoints for starting,
monitoring, and managing workflows in the system.
Routes defined at root level:
- POST /extract-assemble - Start extract-assemble workflow
- GET /{workflow_id}/status - Get workflow status
- GET / - List workflows
These routes are mounted with '/workflows' prefix in the main app.
"""
import logging
import uuid
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
from temporalio.client import Client
from julee.api.dependencies import get_temporal_client
from julee.workflows.extract_assemble import (
ExtractAssembleWorkflow,
EXTRACT_ASSEMBLE_RETRY_POLICY,
)
[docs]
logger = logging.getLogger(__name__)
[docs]
class WorkflowStatusResponse(BaseModel):
"""Response model for workflow status."""
[docs]
status: str # "RUNNING", "COMPLETED", "FAILED", "CANCELLED", etc.
[docs]
current_step: Optional[str] = None
[docs]
assembly_id: Optional[str] = None
[docs]
class StartWorkflowResponse(BaseModel):
"""Response model for starting a workflow."""
@router.post("/extract-assemble", response_model=StartWorkflowResponse)
@router.get("/{workflow_id}/status", response_model=WorkflowStatusResponse)
[docs]
async def get_workflow_status(
workflow_id: str,
temporal_client: Client = Depends(get_temporal_client),
) -> WorkflowStatusResponse:
"""
Get the status of a workflow.
Args:
workflow_id: Workflow ID to query
temporal_client: Temporal client dependency
Returns:
Current workflow status and details
Raises:
HTTPException: If workflow not found or query fails
"""
logger.info("Getting workflow status", extra={"workflow_id": workflow_id})
# Get workflow handle - if this fails, workflow doesn't exist
try:
handle = temporal_client.get_workflow_handle(workflow_id)
except Exception as e:
# Check if it's a workflow not found error (common patterns)
error_message = str(e).lower()
if any(
pattern in error_message
for pattern in [
"not found",
"notfound",
"does not exist",
"workflow_not_found",
]
):
raise HTTPException(
status_code=404,
detail=f"Workflow with ID '{workflow_id}' not found",
)
# Other errors from getting workflow handle
logger.error(
"Failed to get workflow handle: %s",
e,
extra={"workflow_id": workflow_id},
)
raise HTTPException(
status_code=500, detail="Failed to retrieve workflow handle"
) from e
# Get workflow description - if this fails, it's a server error
try:
description = await handle.describe()
except Exception as e:
logger.error(
"Failed to describe workflow: %s",
e,
extra={"workflow_id": workflow_id},
)
raise HTTPException(
status_code=500, detail="Failed to retrieve workflow description"
) from e
# Query current step and assembly ID if workflow supports it
current_step = None
assembly_id = None
try:
current_step = await handle.query("get_current_step")
assembly_id = await handle.query("get_assembly_id")
except Exception as query_error:
logger.debug(
"Could not query workflow details: %s",
query_error,
extra={"workflow_id": workflow_id},
)
status_response = WorkflowStatusResponse(
workflow_id=workflow_id,
run_id=description.run_id or "unknown",
status=description.status.name if description.status else "UNKNOWN",
current_step=current_step,
assembly_id=assembly_id,
)
logger.info(
"Retrieved workflow status",
extra={
"workflow_id": workflow_id,
"status": status_response.status,
"current_step": current_step,
},
)
return status_response