"""
Anthropic implementation of KnowledgeService for the Capture, Extract,
Assemble, Publish workflow.
This module provides the Anthropic-specific implementation of the
KnowledgeService protocol. It handles interactions with Anthropic's API
for document registration and query execution.
Requirements:
- ANTHROPIC_API_KEY environment variable must be set
"""
import os
import logging
import time
import uuid
from typing import Optional, List, Dict, Any
from datetime import datetime, timezone
from anthropic import AsyncAnthropic
from julee.domain.models.knowledge_service_config import (
KnowledgeServiceConfig,
)
from julee.domain.models.document import Document
from ..knowledge_service import (
KnowledgeService,
QueryResult,
FileRegistrationResult,
)
[docs]
logger = logging.getLogger(__name__)
# Default configuration constants
[docs]
DEFAULT_MODEL = "claude-sonnet-4-20250514"
[docs]
DEFAULT_MAX_TOKENS = 4000
[docs]
class AnthropicKnowledgeService(KnowledgeService):
"""
Anthropic implementation of the KnowledgeService protocol.
This class handles interactions with Anthropic's API for document
registration and query execution. It implements the KnowledgeService
protocol with Anthropic-specific logic.
"""
def __init__(self) -> None:
"""Initialize Anthropic knowledge service without configuration.
Configuration will be provided per method call to maintain
stateless operation compatible with Temporal workflows.
"""
# No initialization needed - everything happens per method call
pass
def _get_client(self, config: KnowledgeServiceConfig) -> AsyncAnthropic:
"""Get an initialized Anthropic client.
Args:
config: KnowledgeServiceConfig (for future extensibility)
Returns:
Configured AsyncAnthropic client instance
Raises:
ValueError: If ANTHROPIC_API_KEY environment variable is not set
"""
api_key = os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError(
"ANTHROPIC_API_KEY environment variable is required for "
"AnthropicKnowledgeService"
)
return AsyncAnthropic(
api_key=api_key,
default_headers={"anthropic-beta": "files-api-2025-04-14"},
)
[docs]
async def register_file(
self, config: KnowledgeServiceConfig, document: Document
) -> FileRegistrationResult:
"""Register a document file with Anthropic.
Args:
config: KnowledgeServiceConfig for this operation
document: Document domain object to register
Returns:
FileRegistrationResult with Anthropic-specific details
"""
logger.debug(
"Registering file with Anthropic",
extra={
"knowledge_service_id": config.knowledge_service_id,
"document_id": document.document_id,
},
)
try:
# Get Anthropic client for this operation
client = self._get_client(config)
# Ensure content stream is positioned at beginning for upload
if document.content:
document.content.seek(0)
# Upload file using Anthropic beta Files API
# Use tuple format: (filename, file_stream, media_type)
if not document.content:
raise ValueError("Document content stream is required for upload")
# Anthropic only supports PDF and plaintext files
# Convert JSON content type to text/plain for compatibility
content_type = document.content_type
if content_type == "application/json":
content_type = "text/plain"
file_response = await client.beta.files.upload(
file=(
document.original_filename,
document.content.stream, # type: ignore[arg-type]
content_type,
)
)
anthropic_file_id = file_response.id
result = FileRegistrationResult(
document_id=document.document_id,
knowledge_service_file_id=anthropic_file_id,
registration_metadata={
"service": "anthropic",
"registered_via": "beta_files_api",
"filename": document.original_filename,
"content_type": document.content_type,
"size_bytes": document.size_bytes,
"content_multihash": document.content_multihash,
"anthropic_file_id": anthropic_file_id,
},
created_at=datetime.now(timezone.utc),
)
logger.info(
"File registered with Anthropic beta Files API",
extra={
"knowledge_service_id": config.knowledge_service_id,
"document_id": document.document_id,
"anthropic_file_id": anthropic_file_id,
"original_filename": document.original_filename,
"size_bytes": document.size_bytes,
},
)
return result
except Exception as e:
logger.error(
"Failed to register file with Anthropic",
extra={
"knowledge_service_id": config.knowledge_service_id,
"document_id": document.document_id,
"error": str(e),
},
exc_info=True,
)
raise
[docs]
async def execute_query(
self,
config: KnowledgeServiceConfig,
query_text: str,
service_file_ids: Optional[List[str]] = None,
query_metadata: Optional[Dict[str, Any]] = None,
assistant_prompt: Optional[str] = None,
) -> QueryResult:
"""Execute a query against Anthropic.
Args:
config: KnowledgeServiceConfig for this operation
query_text: The query to execute
service_file_ids: Optional list of Anthropic file IDs to provide
as context for the query
query_metadata: Optional Anthropic-specific configuration such as
model, temperature, max_tokens, etc.
assistant_prompt: Optional assistant message content to constrain
or prime the model's response
Returns:
QueryResult with Anthropic query results
"""
logger.debug(
"Executing query with Anthropic",
extra={
"knowledge_service_id": config.knowledge_service_id,
"query_text": query_text,
"document_count": (len(service_file_ids) if service_file_ids else 0),
"file_count": (len(service_file_ids) if service_file_ids else 0),
},
)
start_time = time.time()
query_id = f"anthropic_{uuid.uuid4().hex[:12]}"
# Extract configuration from query_metadata
metadata = query_metadata or {}
model = metadata.get("model", DEFAULT_MODEL)
max_tokens = metadata.get("max_tokens", DEFAULT_MAX_TOKENS)
temperature = metadata.get("temperature")
try:
# Get Anthropic client for this operation
client = self._get_client(config)
# Prepare the message content with file attachments if provided
content_parts = []
# Add file attachments if service_file_ids are provided
if service_file_ids:
for file_id in service_file_ids:
content_parts.append(
{
"type": "document",
"source": {"type": "file", "file_id": file_id},
}
)
# Add the text query
content_parts.append({"type": "text", "text": query_text})
# Prepare messages for the API
messages = [{"role": "user", "content": content_parts}]
# Add assistant message if provided to constrain response
if assistant_prompt:
messages.append({"role": "assistant", "content": assistant_prompt})
create_params = {
"model": model,
"max_tokens": max_tokens,
"messages": messages,
}
# Add temperature if specified
if temperature is not None:
create_params["temperature"] = temperature
response = await client.messages.create(**create_params)
# Calculate execution time
execution_time_ms = int((time.time() - start_time) * 1000)
# Validate response has exactly one content block of type 'text'
if len(response.content) != 1:
raise ValueError(
f"Expected exactly 1 content block, got " f"{len(response.content)}"
)
content_block = response.content[0]
if not hasattr(content_block, "type") or content_block.type != "text":
block_type = getattr(content_block, "type", "unknown")
raise ValueError(
f"Expected content block type 'text', got '{block_type}'"
)
if not hasattr(content_block, "text"):
raise ValueError("Text content block missing 'text' attribute")
response_text = str(content_block.text)
logger.debug(
"Single text content block validated and extracted",
extra={
"knowledge_service_id": config.knowledge_service_id,
"query_id": query_id,
"response_length": len(response_text),
},
)
# Structure the result with single text content
result_data = {
"response": response_text,
"model": model,
"service": "anthropic",
"sources": service_file_ids or [],
"usage": {
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
},
"stop_reason": response.stop_reason,
}
result = QueryResult(
query_id=query_id,
query_text=query_text,
result_data=result_data,
execution_time_ms=execution_time_ms,
created_at=datetime.now(timezone.utc),
)
logger.info(
"Query executed with Anthropic successfully",
extra={
"knowledge_service_id": config.knowledge_service_id,
"query_id": query_id,
"execution_time_ms": execution_time_ms,
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
"file_count": (len(service_file_ids) if service_file_ids else 0),
},
)
return result
except Exception as e:
execution_time_ms = int((time.time() - start_time) * 1000)
logger.error(
"Failed to execute query with Anthropic",
extra={
"knowledge_service_id": config.knowledge_service_id,
"query_id": query_id,
"query_text": query_text,
"execution_time_ms": execution_time_ms,
"file_count": (len(service_file_ids) if service_file_ids else 0),
"error": str(e),
},
exc_info=True,
)
raise