Source code for julee.services.knowledge_service.memory.knowledge_service

"""
Memory implementation of KnowledgeService for testing and development.

This module provides an in-memory implementation of the KnowledgeService
protocol that stores file registrations in a dictionary and returns
configurable canned query responses. Useful for testing and development
scenarios where external service dependencies should be avoided.
"""

import json
import logging
from collections import deque
from datetime import datetime, timezone
from typing import Any

from julee.domain.models.document import Document
from julee.domain.models.knowledge_service_config import (
    KnowledgeServiceConfig,
)

from ..knowledge_service import (
    FileRegistrationResult,
    KnowledgeService,
    QueryResult,
)

[docs] logger = logging.getLogger(__name__)
[docs] class MemoryKnowledgeService(KnowledgeService): """ In-memory implementation of the KnowledgeService protocol. This class stores file registrations in memory using a dictionary keyed by knowledge_service_file_id. Query results are returned from a configurable queue of canned responses. Useful for testing and development scenarios where you want to avoid external service dependencies while still exercising the full knowledge service workflow. """ def __init__( self, config: KnowledgeServiceConfig, ) -> None: """Initialize memory knowledge service with configuration. Args: config: KnowledgeServiceConfig domain object containing metadata and service configuration """ logger.debug( "Initializing MemoryKnowledgeService", extra={ "knowledge_service_id": config.knowledge_service_id, "service_name": config.name, }, )
[docs] self.config = config
# Storage for file registrations, keyed by knowledge_service_file_id self._registered_files: dict[str, FileRegistrationResult] = {} # Queue of canned query results to return self._canned_query_results: deque[QueryResult] = deque()
[docs] def add_canned_query_result(self, query_result: QueryResult) -> None: """Add a canned query result to be returned by execute_query. Args: query_result: QueryResult to return from future execute_query calls """ logger.debug( "Adding canned query result", extra={ "knowledge_service_id": self.config.knowledge_service_id, "query_id": query_result.query_id, }, ) self._canned_query_results.append(query_result)
[docs] def add_canned_query_results(self, query_results: list[QueryResult]) -> None: """Add multiple canned query results to be returned by execute_query. Args: query_results: List of QueryResult objects to return from future execute_query calls """ logger.debug( "Adding multiple canned query results", extra={ "knowledge_service_id": self.config.knowledge_service_id, "count": len(query_results), }, ) self._canned_query_results.extend(query_results)
[docs] def clear_canned_query_results(self) -> None: """Clear all canned query results.""" logger.debug( "Clearing canned query results", extra={ "knowledge_service_id": self.config.knowledge_service_id, "count": len(self._canned_query_results), }, ) self._canned_query_results.clear()
[docs] def get_registered_file( self, knowledge_service_file_id: str ) -> FileRegistrationResult | None: """Get a registered file by its knowledge service file ID. Args: knowledge_service_file_id: The file ID assigned by this service Returns: FileRegistrationResult if found, None otherwise """ return self._registered_files.get(knowledge_service_file_id)
[docs] def get_all_registered_files(self) -> dict[str, FileRegistrationResult]: """Get all registered files. Returns: Dictionary mapping knowledge_service_file_id to FileRegistrationResult """ return self._registered_files.copy()
[docs] async def register_file( self, config: KnowledgeServiceConfig, document: Document ) -> FileRegistrationResult: """Register a document file by storing metadata in memory. Args: config: KnowledgeServiceConfig for this operation document: Document domain object to register Returns: FileRegistrationResult with memory-specific details """ logger.debug( "Registering file with memory service", extra={ "knowledge_service_id": config.knowledge_service_id, "document_id": document.document_id, }, ) # Check if already registered for existing_result in self._registered_files.values(): if existing_result.document_id == document.document_id: logger.debug( "Document already registered, returning existing result", extra={ "knowledge_service_id": (config.knowledge_service_id), "document_id": document.document_id, "knowledge_service_file_id": ( existing_result.knowledge_service_file_id ), }, ) return existing_result # Generate a unique file ID for this service timestamp = int(datetime.now().timestamp()) memory_file_id = f"memory_{document.document_id}_{timestamp}" # Create registration result result = FileRegistrationResult( document_id=document.document_id, knowledge_service_file_id=memory_file_id, registration_metadata={ "service": "memory", "registered_via": "in_memory_storage", "timestamp": timestamp, "knowledge_service_id": config.knowledge_service_id, "filename": document.original_filename, "content_type": document.content_type, "size_bytes": document.size_bytes, }, created_at=datetime.now(timezone.utc), ) # Store in memory dictionary keyed by knowledge_service_file_id self._registered_files[memory_file_id] = result logger.info( "File registered with MemoryKnowledgeService", extra={ "knowledge_service_id": config.knowledge_service_id, "document_id": document.document_id, "knowledge_service_file_id": memory_file_id, "total_registered": len(self._registered_files), }, ) return result
[docs] async def execute_query( self, config: KnowledgeServiceConfig, query_text: str, output_schema: dict[str, Any] | None = None, service_file_ids: list[str] | None = None, query_metadata: dict[str, Any] | None = None, assistant_prompt: str | None = None, ) -> QueryResult: """Execute a query by returning a canned response. Args: config: KnowledgeServiceConfig for this operation query_text: The query to execute output_schema: Optional JSON schema for structured response service_file_ids: Optional list of service file IDs for query query_metadata: Optional service-specific metadata assistant_prompt: Optional assistant message content (ignored in memory implementation) Returns: QueryResult from the queue of canned responses Raises: ValueError: If no canned query results are available """ # Handle schema embedding if provided (same as Anthropic service) if output_schema: # Build query with embedded schema schema_json = json.dumps(output_schema, indent=2) enhanced_query_text = f"""{query_text} Please structure your response according to this JSON schema: {schema_json} Return only valid JSON that conforms to this schema, without any surrounding text or markdown formatting.""" has_schema = True else: enhanced_query_text = query_text has_schema = False logger.debug( "Executing query with MemoryKnowledgeService", extra={ "knowledge_service_id": config.knowledge_service_id, "query_text": enhanced_query_text, "document_count": (len(service_file_ids) if service_file_ids else 0), "has_output_schema": has_schema, }, ) # Check if we have canned results available if not self._canned_query_results: error_msg = ( "No canned query results available. Use " "add_canned_query_result() to configure responses." ) logger.error( error_msg, extra={ "knowledge_service_id": config.knowledge_service_id, "query_text": query_text, }, ) raise ValueError(error_msg) # Pop and return the next canned result result = self._canned_query_results.popleft() # For memory service, the canned response should already be a parsed object # This maintains compatibility with existing tests regardless of schema presence response_value = result.result_data.get("response") # Update the result to reflect the actual query parameters updated_result = QueryResult( query_id=result.query_id, query_text=enhanced_query_text if has_schema else query_text, result_data={ **result.result_data, "response": response_value, "queried_documents": service_file_ids or [], "service": "memory", "knowledge_service_id": config.knowledge_service_id, }, execution_time_ms=result.execution_time_ms, created_at=datetime.now(timezone.utc), ) logger.info( "Query executed with MemoryKnowledgeService", extra={ "knowledge_service_id": config.knowledge_service_id, "query_id": updated_result.query_id, "execution_time_ms": updated_result.execution_time_ms, "remaining_canned_results": len(self._canned_query_results), }, ) return updated_result