Source code for julee.services.knowledge_service.knowledge_service

"""
KnowledgeService protocol for external service operations in the Capture,
Extract, Assemble, Publish workflow.

This module defines the KnowledgeService protocol that handles interactions
with external knowledge services, including document registration and query
execution. This protocol is separate from the repository layer which only
handles local metadata persistence.

Concrete implementations of this protocol are provided for different external
services (Anthropic, OpenAI, etc.) and are created via factory functions.
"""

from datetime import datetime, timezone
from typing import (
    TYPE_CHECKING,
    Any,
    Protocol,
    runtime_checkable,
)

from pydantic import BaseModel, Field

if TYPE_CHECKING:
    from julee.domain.models.knowledge_service_config import (
        KnowledgeServiceConfig,
    )

from julee.domain.models.document import Document


[docs] class QueryResult(BaseModel): """Result of a knowledge service query execution."""
[docs] query_id: str = Field(description="Unique identifier for this query execution")
[docs] query_text: str = Field(description="The original query text that was executed")
[docs] result_data: dict[str, Any] = Field( default_factory=dict, description="The structured result data from the query", )
[docs] execution_time_ms: int | None = Field( default=None, description="Time taken to execute the query in milliseconds", )
[docs] created_at: datetime | None = Field( default_factory=lambda: datetime.now(timezone.utc) )
[docs] class FileRegistrationResult(BaseModel): """Result of registering a file with a knowledge service."""
[docs] document_id: str = Field(description="The original document ID from our system")
[docs] knowledge_service_file_id: str = Field( description="The file identifier assigned by the knowledge service" )
[docs] registration_metadata: dict[str, Any] = Field( default_factory=dict, description="Additional metadata from the registration process", )
[docs] created_at: datetime | None = Field( default_factory=lambda: datetime.now(timezone.utc) )
@runtime_checkable
[docs] class KnowledgeService(Protocol): """ Protocol for interacting with external knowledge services. This protocol defines the interface for external operations that were moved out of the repository layer. Implementations handle the specifics of different knowledge service APIs (Anthropic, OpenAI, etc.). """
[docs] async def register_file( self, config: "KnowledgeServiceConfig", document: Document ) -> FileRegistrationResult: """Register a document file with the external knowledge service. This method registers a document with the external knowledge service, allowing that service to analyze and index the document content for future queries. Args: config: KnowledgeServiceConfig for the service to use document: Document domain object to register Returns: FileRegistrationResult containing registration details and the service's internal file identifier .. rubric:: Implementation Notes - Must be idempotent: re-registering same document returns same result - Should handle service unavailability gracefully - Must return the service's internal file ID for future queries - Document content is accessed directly from the Document object - Should handle various document formats and sizes .. rubric:: Workflow Context In Temporal workflows, this method is implemented as an activity to ensure registration results are durably stored and consistent across workflow replays. """ ...
[docs] async def execute_query( self, config: "KnowledgeServiceConfig", query_text: str, output_schema: dict[str, Any] | None = None, service_file_ids: list[str] | None = None, query_metadata: dict[str, Any] | None = None, assistant_prompt: str | None = None, ) -> QueryResult: """Execute a query against the external knowledge service. This method executes a text query against the knowledge service, optionally scoping the query to specific documents that have been previously registered with the service. Args: config: KnowledgeServiceConfig for the service to use query_text: The query to execute (natural language or structured) output_schema: Optional JSON schema for structured response. When provided, the service will attempt to return results conforming to this schema using structured outputs or schema-guided prompting. service_file_ids: Optional list of service file IDs to provide as context for the query. These are the IDs returned by the knowledge service from register_file operations, and are included in the query to give the service access to specific documents. query_metadata: Optional service-specific metadata and configuration options such as model selection, temperature, max_tokens, etc. The structure depends on the specific knowledge service being used. assistant_prompt: Optional assistant message content to constrain or prime the model's response. This is added as the final assistant message before the model generates its response, allowing control over response format and structure. Returns: QueryResult containing query results and execution metadata .. rubric:: Implementation Notes - Must be idempotent: same query returns consistent results - Service file IDs are provided as context to enhance query responses - Should handle service unavailability gracefully - Query results should be structured as domain objects - Should track execution time and metadata - Must handle various query formats (natural language, structured, etc.) - Should validate that service_file_ids exist in the service before including them in the query context .. rubric:: Workflow Context In Temporal workflows, this method is implemented as an activity to ensure query results are durably stored and can be replayed consistently. """ ...